Boost.Asio:为每个连接/套接字使用一个 `io_service` 是一件好事吗?

2021-12-24 00:00:00 sockets multithreading c++ boost boost-asio

我想创建一个实现一个线程每个连接模型的应用程序.但是每个连接都必须是可停止的.我已经尝试过 这个 boost.asio 示例 实现了我想要的阻塞版本.但是经过一番质疑后,我发现没有可靠的方法来停止该示例的会话.所以我试图实现我自己的.我不得不使用异步函数.由于我想让一个线程只管理一个连接,并且无法控制哪个异步作业用于哪个线程,所以我决定对每个连接/套接字/线程使用 io_service.>

那么这是一个好方法吗,你知道更好的方法吗?

我的代码在这里,所以你可以检查和查看它:

#include #include #include #include #include #include <列表>#include #include <字符串>#include 命名空间 ba = boost::asio;命名空间 bs = boost::system;命名空间 b = 提升;typedef ba::ip::tcp::acceptoracceptor_type;typedef ba::ip::tcp::socket socket_type;常量短端口 = 11235;类服务器;//一个连接有自己的io_service和socket类连接{受保护:ba::io_service 服务;socket_type 袜子;b::线程*线程;ba::streambuf 流缓冲;//用于阅读等服务器 *服务器;void AsyncReadString() {ba::async_read_until(短袜,流缓冲,'',//空字符是一个分隔符b::bind(&Connection::ReadHandler, this,ba::placeholders::error,ba::placeholders::bytes_transferred));}void AsyncWriteString(const std::string &s) {std::string newstr = s + '';//添加一个空字符ba::async_write(短袜,ba::buffer(newstr.c_str(), newstr.size()),b::bind(&Connection::WriteHandler, this,ba::placeholders::error,ba::placeholders::bytes_transferred));}虚拟无效会话(){异步读取字符串();服务运行();//最后运行}std::string ExtractString() {std::istream is(&stream_buffer);std::string s;std::getline(is, s, '');返回 s;}虚空ReadHandler(const bs::error_code &ec,std::size_t bytes_transferred) {如果(!ec){std::cout <<(ExtractString() + "
");std::cout.flush();异步读取字符串();//再读一次}别的 {//什么都不做,this"稍后会被删除}}虚空WriteHandler(const bs::error_code &ec,std::size_t bytes_transferred) {}上市:连接(服务器*s):服务(),袜子(服务),服务器,线程(空){ }socket_type&插座() {返回袜子;}无效开始(){如果(线程)删除线程;线程 = 新 b::thread(b::bind(&Connection::Session, this));}无效加入(){if (thread) thread->join();}无效停止(){服务.停止();}无效杀我();虚拟~连接(){}};//服务器也有自己的 io_service 但它只用于接受类服务器{上市:std::list<连接*>连接;受保护:ba::io_service 服务;acceptor_type acc;b::线程*线程;virtual void AcceptHandler(const bs::error_code &ec) {如果(!ec){Connections.back()->Start();Connections.push_back(new Connection(this));acc.async_accept(Connections.back()->Socket(),b::bind(&Server::AcceptHandler,这,ba::placeholders::error));}别的 {//没做什么//因为新会话将被删除//由析构函数自动}}虚拟无效 ThreadFunc() {Connections.push_back(new Connection(this));acc.async_accept(Connections.back()->Socket(),b::bind(&Server::AcceptHandler,这,ba::placeholders::error));服务运行();}上市:服务器():服务(),acc(服务, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),线程(空){ }无效开始(){如果(线程)删除线程;线程 = 新 b::thread(b::bind(&Server::ThreadFunc, this));}无效停止(){服务.停止();}无效加入(){if (thread) thread->join();}无效停止所有连接(){for (auto c: Connections) {c->停止();}}void JoinAllConnections() {for (auto c: Connections) {c->加入();}}void KillAllConnections() {for (auto c: Connections) {删除c;}Connections.clear();}void KillConnection(Connection *c) {Connections.remove(c);删除c;}虚拟?服务器(){删除线程;//连接应该被用户删除(?)}};无效连接::KillMe() {服务器-> KillConnection(this);}int main() {尝试 {服务器;s.开始();std::cin.get();//等待输入s.停止();//先停止监听s.StopAllConnections();//中断正在进行的连接s.Join();//等待服务器,应立即返回s.JoinAllConnections();//等待正在进行的连接s.KillAllConnections();//销毁连接对象//在作用域结束时,服务器将被销毁}捕获(标准::异常&e){std::cerr <<例外:"<

解决方案

没有.每个连接使用一个 io_service 对象绝对是一种味道.特别是因为您还在一个专用线程上运行每个连接.

此时你必须问问自己异步给你带来了什么?您可以让所有代码同步并拥有完全相同数量的线程等.

很明显,您希望将连接多路复用到数量少得多的服务上.在实践中,有一些合理的模型,如

  1. 单个 io_service 和单个服务线程(这通常很好).在服务上排队的任何任务都不会阻塞很长时间,否则延迟会受到影响

  2. 单个 io_service 带有多个执行处理程序的线程.池中的线程数应该足以为最大值提供服务.支持的并发 CPU 密集型任务的数量(或者延迟将开始上升)

  3. 每个线程一个 io_service,通常每个逻辑核心一个线程并且具有线程关联性,以便它粘"到那个核心.这可能是缓存局部性的理想选择

更新:演示

下面的演示展示了使用选项 1. 的惯用风格:

生活在 Coliru

#include #include #include #include <boost/enable_shared_from_this.hpp>#include #include #include #include #include <列表>#include <字符串>命名空间 ba = boost::asio;命名空间 bs = boost::system;命名空间 b = 提升;typedef ba::ip::tcp::acceptoracceptor_type;typedef ba::ip::tcp::socket socket_type;常量短端口 = 11235;//一个连接有自己的io_service和socket类连接:公共 b::enable_shared_from_this<Connection>{上市:typedef boost::shared_ptr点;受保护:socket_type 袜子;ba::streambuf 流缓冲;//用于阅读等std::string 消息;void AsyncReadString() {std::cout <<__PRETTY_FUNCTION__ <<"
";ba::async_read_until(短袜,流缓冲,'',//空字符是一个分隔符b::bind(&Connection::ReadHandler, shared_from_this(),ba::placeholders::error,ba::placeholders::bytes_transferred));}void AsyncWriteString(const std::string &s) {std::cout <<__PRETTY_FUNCTION__ <<"
";消息 = s;ba::async_write(短袜,ba::buffer(message.c_str(), message.size()+1),b::bind(&Connection::WriteHandler, shared_from_this(),ba::placeholders::error,ba::placeholders::bytes_transferred));}std::string ExtractString() {std::cout <<__PRETTY_FUNCTION__ <<"
";std::istream is(&stream_buffer);std::string s;std::getline(is, s, '');返回 s;}无效读取处理程序(const bs::error_code &ec,std::size_t bytes_transferred){std::cout <<__PRETTY_FUNCTION__ <<"
";如果(!ec){std::cout <<(ExtractString() + "
");std::cout.flush();异步读取字符串();//再读一次}别的 {//什么都不做,this"稍后会被删除}}void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {std::cout <<__PRETTY_FUNCTION__ <<"
";}上市:连接(ba::io_service& svc):袜子(svc){}虚拟~连接(){std::cout <<__PRETTY_FUNCTION__ <<"
";}socket_type&套接字(){ 返回袜子;}void Session() { AsyncReadString();}无效停止(){袜子.取消();}};//服务器也有自己的 io_service 但它只用于接受类服务器{上市:std::list>m_connections;受保护:ba::io_service _service;boost::optional_工作;接受者_类型_acc;b::线程线程;void AcceptHandler(const bs::error_code &ec, Connection::Ptr 被接受) {如果(!ec){接受->会话();接受();}别的 {//什么都不做,新会话将被自动删除//析构函数}}无效的接受(){auto newaccept = boost::make_shared(_service);_acc.async_accept(newaccept->Socket(),b::bind(&Server::AcceptHandler,这,ba::placeholders::error,新接受));}上市:服务器():_服务(),_work(ba::io_service::work(_service)),_acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),线程(b::bind(&ba::io_service::run, &_service)){ }?服务器(){std::cout <<__PRETTY_FUNCTION__ <<"
";停止();_work.reset();if (thread.joinable()) thread.join();}无效开始(){std::cout <<__PRETTY_FUNCTION__ <<"
";接受();}无效停止(){std::cout <<__PRETTY_FUNCTION__ <<"
";_acc.cancel();}无效停止所有连接(){std::cout <<__PRETTY_FUNCTION__ <<"
";for (auto c: m_connections) {if (auto p = c.lock())p->停止();}}};int main() {尝试 {服务器;s.开始();std::cerr <<"2 秒后关机...
";b::this_thread::sleep_for(b::chrono::seconds(2));std::cerr <<"停止接受...
";s.停止();std::cerr <<"关机...
";s.StopAllConnections();//中断正在进行的连接}//Server 的析构函数将加入服务线程捕获(标准::异常&e){std::cerr <<__功能__ <<":" <<__LINE__ <<"
";std::cerr <<例外:"<

我修改了 main() 以在没有用户干预的情况下运行 2 秒.这样我就可以演示它Live On Coliru(来自当然,客户端进程的数量是有限的).

如果你有很多(lot)客户运行它,使用例如

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 11235)& done; wait)

你会发现第二个窗口把它们都处理了:

$ ./test |排序 |uniq -c |排序 -n |尾巴2秒后关机...关掉...再见2 你好世界 282142 你好世界 45542 你好世界 62162 你好世界 78642 你好世界 99662 无效服务器::停止()1000 std::string Connection::ExtractString()1001 虚拟连接::~Connection()2000 无效连接::AsyncReadString()2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

如果你真的发疯并提高 1000 到例如100000 那里,你会得到类似的东西:

sehe@desktop:/tmp$ ./test |排序 |uniq -c |排序 -n |尾巴2秒后关机...关掉...再见2 你好世界 54832 你好世界 5792 你好世界 58652 你好世界 9382 无效服务器::停止()3 你好世界 96131741 std::string Connection::ExtractString()1742 虚拟连接::~Connection()3482 无效连接::AsyncReadString()第 3482 章

服务器重复运行 2 秒.

I want to create an application that implements one-thread-per-connection model. But each connection must be stoppable. I have tried this boost.asio example which implements the blocking version of what I want. But after a little bit questioning I've found out that there is no reliable way to stop the session of that example. So I've tried to implement my own. I had to use asynchronous functions. Since I want to make a thread to manage only one connection and there is no way to control which asynchronous job is employed to which thread, I decided to use io_service for each connection/socket/thread.

So is it a good approach, do you know a better approach?

My code is here so you can examine and review it:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
    ba::io_service service;
    socket_type sock;
    b::thread *thread;
    ba::streambuf stream_buffer;    // for reading etc
    Server *server;
    void AsyncReadString() {
        ba::async_read_until(
            sock,
            stream_buffer,
            '',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::string newstr = s + '';  // add a null char
        ba::async_write(
            sock,
            ba::buffer(newstr.c_str(), newstr.size()),
            b::bind(&Connection::WriteHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    virtual void Session() {
        AsyncReadString();
        service.run();  // run at last
    }
    std::string ExtractString() {
        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '');
        return s;
    }
    virtual void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
        if (!ec) {
            std::cout << (ExtractString() + "
");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    virtual void WriteHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
    }
public:
    Connection(Server *s) :
        service(),
        sock(service),
        server(s),
        thread(NULL)
    {  }
    socket_type& Socket() {
        return sock;
    }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Connection::Session, this));
    }
    void Join() {
        if (thread) thread->join();
    }
    void Stop() {
        service.stop();
    }
    void KillMe();
    virtual ~Connection() {
    }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<Connection*> Connections;
protected:
    ba::io_service service;
    acceptor_type acc;
    b::thread *thread;
    virtual void AcceptHandler(const bs::error_code &ec) {
        if (!ec) {
            Connections.back()->Start();
            Connections.push_back(new Connection(this));
            acc.async_accept(
                Connections.back()->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error));
        }
        else {
            // do nothing
            // since the new session will be deleted
            // automatically by the destructor
        }
    }
    virtual void ThreadFunc() {
        Connections.push_back(new Connection(this));
        acc.async_accept(
            Connections.back()->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error));
        service.run();
    }
public:
    Server():
        service(),
        acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(NULL)
    {  }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Server::ThreadFunc, this));
    }
    void Stop() {
        service.stop();
    }
    void Join() {
        if (thread) thread->join();
    }
    void StopAllConnections() {
        for (auto c : Connections) {
            c->Stop();
        }
    }
    void JoinAllConnections() {
        for (auto c : Connections) {
            c->Join();
        }
    }
    void KillAllConnections() {
        for (auto c : Connections) {
            delete c;
        }
        Connections.clear();
    }
    void KillConnection(Connection *c) {
        Connections.remove(c);
        delete c;
    }
    virtual ~Server() {
        delete thread;
        // connection should be deleted by the user (?)
    }
};

void Connection::KillMe() {
    server->KillConnection(this);
}

int main() {
    try {
        Server s;
        s.Start();
        std::cin.get(); // wait for enter
        s.Stop();   // stop listening first
        s.StopAllConnections(); // interrupt ongoing connections
        s.Join();   // wait for server, should return immediately
        s.JoinAllConnections(); // wait for ongoing connections
        s.KillAllConnections(); // destroy connection objects
        // at the end of scope, Server will be destroyed
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}

解决方案

No. Using an io_service object per connection is definitely a smell. Especially since you're also running each connection on a dedicated thread.

At this point you have to ask yourself what did asynchrony buy you? You can have all the code synchronous and have exactly the same number of threads etc.

Clearly you want to multiplex the connections onto a far smaller number of services. In practice there are a few sensible models like

  1. a single io_service with a single service thread (this is usually good). No tasks queued on the service may ever block for significant time or the latency will suffer

  2. a single io_service with a number of threads executing handlers. The number of threads in the pool should be enough to service the max. number of simultaneous CPU intensive tasks supported (or again, the latency will start to go up)

  3. an io_service per thread, usually one thread per logical core and with thread affinity so that it "sticks" to that core. This can be ideal for cache locality

UPDATE: Demo

Here's a demo that shows the idiomatic style using option 1. from above:

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket   socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
    typedef boost::shared_ptr<Connection> Ptr;
protected:
    socket_type    sock;
    ba::streambuf  stream_buffer; // for reading etc
    std::string    message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "
";

        ba::async_read_until(
            sock,
            stream_buffer,
            '',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "
";

        message = s;

        ba::async_write(
            sock,
            ba::buffer(message.c_str(), message.size()+1),
            b::bind(&Connection::WriteHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "
";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '');
        return s;
    }
    void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) 
    {
        std::cout << __PRETTY_FUNCTION__ << "
";

        if (!ec) {
            std::cout << (ExtractString() + "
");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __PRETTY_FUNCTION__ << "
";
    }
public:
    Connection(ba::io_service& svc) : sock(svc) { }

    virtual ~Connection() {
        std::cout << __PRETTY_FUNCTION__ << "
";
    }

    socket_type& Socket() { return sock;          } 
    void Session()        { AsyncReadString();    } 
    void Stop()           { sock.cancel();        }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<boost::weak_ptr<Connection> > m_connections;
protected:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _work;
    acceptor_type _acc;
    b::thread thread;

    void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
        if (!ec) {
            accepted->Session();
            DoAccept();
        }
        else {
            // do nothing the new session will be deleted automatically by the
            // destructor
        }
    }

    void DoAccept() {
        auto newaccept = boost::make_shared<Connection>(_service);

        _acc.async_accept(
            newaccept->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error,
                newaccept
            ));
    }

public:
    Server():
        _service(),
        _work(ba::io_service::work(_service)),
        _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(b::bind(&ba::io_service::run, &_service))
    {  }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        Stop();
        _work.reset();
        if (thread.joinable()) thread.join();
    }

    void Start() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        DoAccept();
    }

    void Stop() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        _acc.cancel();
    }

    void StopAllConnections() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        for (auto c : m_connections) {
            if (auto p = c.lock())
                p->Stop();
        }
    }
};

int main() {
    try {
        Server s;
        s.Start();

        std::cerr << "Shutdown in 2 seconds...
";
        b::this_thread::sleep_for(b::chrono::seconds(2));

        std::cerr << "Stop accepting...
";
        s.Stop();

        std::cerr << "Shutdown...
";
        s.StopAllConnections(); // interrupt ongoing connections
    } // destructor of Server will join the service thread
    catch (std::exception &e) {
        std::cerr << __FUNCTION__ << ":" << __LINE__ << "
";
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }

    std::cerr << "Byebye
";
}

I modified the main() to run for 2 seconds without user intervention. This is so I can demo it Live On Coliru (of course, it's limited w.r.t the number of client processes).

If you run it with a lot (a lot) of clients, using e.g.

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 11235)& done; wait)

You will find that the two second window handles them all:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 28214
      2 hello world 4554
      2 hello world 6216
      2 hello world 7864
      2 hello world 9966
      2 void Server::Stop()
   1000 std::string Connection::ExtractString()
   1001 virtual Connection::~Connection()
   2000 void Connection::AsyncReadString()
   2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

If you really go berserk and raise 1000 to e.g. 100000 there, you'll get things similar to:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 5483
      2 hello world 579
      2 hello world 5865
      2 hello world 938
      2 void Server::Stop()
      3 hello world 9613
   1741 std::string Connection::ExtractString()
   1742 virtual Connection::~Connection()
   3482 void Connection::AsyncReadString()
   3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

On repeated 2-second runs of the server.

相关文章