Boost::asio - 如何中断被阻塞的 tcp 服务器线程?

2021-12-11 00:00:00 sockets multithreading c++ boost boost-asio

我正在开发一个多线程应用程序,其中一个线程充当从客户端接收命令的 tcp 服务器.该线程使用 Boost 套接字和接受器等待客户端连接,从客户端接收命令,将命令传递给应用程序的其余部分,然后再次等待.代码如下:

I'm working on a multithreaded application in which one thread acts as a tcp server which receives commands from a client. The thread uses a Boost socket and acceptor to wait for a client to connect, receives a command from the client, passes the command to the rest of the application, then waits again. Here's the code:

void ServerThreadFunc()
{
    using boost::asio::ip::tcp;
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port_no));

    for (;;)
    {
        //  listen for command connection
        tcp::socket socket(io_service);
        acceptor.accept(socket);

        //  connected; receive command
        boost::array<char,256> msg_buf;
        socket.receive(boost::asio::buffer(msg_buf));

        //  do something with received bytes here
    }
}

该线程的大部分时间都花在对acceptor.accept() 的调用上.目前,线程仅在应用程序退出时终止.不幸的是,这会在 main() 返回后导致崩溃 - 我相信是因为在单例被销毁后线程试图访问应用程序的日志单例.(老实说,我刚来的时候就是这样.)

This thread spends most of its time blocked on the call to acceptor.accept(). At the moment, the thread only gets terminated when the application exits. Unfortunately, this causes a crash after main() returns - I believe because the thread tries to access the app's logging singleton after the singleton has been destroyed. (It was like that when I got here, honest guv.)

如何在应用程序退出时干净地关闭此线程?我读到可以通过从另一个线程关闭套接字来中断原始套接字上的阻塞 accept() 调用,但这似乎不适用于 Boost 套接字.我尝试使用 Boost 异步 tcp 回显服务器示例,但这似乎只是交换了对 acceptor::accept() 的阻塞调用来对 io_service 的阻塞调用::run(),所以我遇到了同样的问题:一个无法中断的阻塞调用.有什么想法吗?

How can I shut this thread down cleanly when it's time for the application to exit? I've read that a blocking accept() call on a raw socket can be interrupted by closing the socket from another thread, but this doesn't appear to work on a Boost socket. I've tried converting the server logic to asynchronous i/o using the Boost asynchronous tcp echo server example, but that just seems to exchange a blocking call to acceptor::accept() for a blocking call to io_service::run(), so I'm left with the same problem: a blocked call which I can't interrupt. Any ideas?

推荐答案

总之,有两个选择:

  • 将代码更改为异步(acceptor::async_accept()async_read),通过 io_service::run() 在事件循环中运行code>,并通过 io_service::stop() 取消.
  • 强制阻塞调用以较低级别的机制(例如信号)中断.
  • Change code to be asynchronous (acceptor::async_accept() and async_read), run within the event loop via io_service::run(), and cancel via io_service::stop().
  • Force blocking calls to interrupt with lower level mechanics, such as signals.

我会推荐第一个选项,因为它更易于携带且更易于维护.要理解的重要概念是 io_service::run() 只会在有待处理的工作时阻塞.当 io_service::stop() 被调用,它会尝试使 io_service::run() 上阻塞的所有线程尽快返回;它不会中断同步操作,例如 acceptor::accept()socket::receive(),即使同步操作在事件循环中被调用.需要注意的是,io_service::stop() 是一个非阻塞调用,因此与 io_service::run() 上阻塞的线程同步必须使用另一个机制,例如 thread::join().

I would recommend the first option, as it is more likely to be the portable and easier to maintain. The important concept to understand is that the io_service::run() only blocks as long as there is pending work. When io_service::stop() is invoked, it will try to cause all threads blocked on io_service::run() to return as soon as possible; it will not interrupt synchronous operations, such as acceptor::accept() and socket::receive(), even if the synchronous operations are invoked within the event loop. It is important to note that io_service::stop() is a non-blocking call, so synchronization with threads that were blocked on io_service::run() must use another mechanic, such as thread::join().

这是一个运行 10 秒并侦听端口 8080 的示例:

Here is an example that will run for 10 seconds and listens to port 8080:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <iostream>

void StartAccept( boost::asio::ip::tcp::acceptor& );

void ServerThreadFunc( boost::asio::io_service& io_service )
{
  using boost::asio::ip::tcp;
  tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), 8080 ) );

  // Add a job to start accepting connections.
  StartAccept( acceptor );

  // Process event loop.
  io_service.run();

  std::cout << "Server thread exiting." << std::endl;
}

void HandleAccept( const boost::system::error_code& error,
                   boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
                   boost::asio::ip::tcp::acceptor& acceptor )
{
  // If there was an error, then do not add any more jobs to the service.
  if ( error )
  {
    std::cout << "Error accepting connection: " << error.message() 
              << std::endl;
    return;
  }

  // Otherwise, the socket is good to use.
  std::cout << "Doing things with socket..." << std::endl;

  // Perform async operations on the socket.

  // Done using the socket, so start accepting another connection.  This
  // will add a job to the service, preventing io_service::run() from
  // returning.
  std::cout << "Done using socket, ready for another connection." 
            << std::endl;
  StartAccept( acceptor );
};

void StartAccept( boost::asio::ip::tcp::acceptor& acceptor )
{
  using boost::asio::ip::tcp;
  boost::shared_ptr< tcp::socket > socket(
                                new tcp::socket( acceptor.get_io_service() ) );

  // Add an accept call to the service.  This will prevent io_service::run()
  // from returning.
  std::cout << "Waiting on connection" << std::endl;
  acceptor.async_accept( *socket,
    boost::bind( HandleAccept,
      boost::asio::placeholders::error,
      socket,
      boost::ref( acceptor ) ) );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Create server thread that will start accepting connections.
  boost::thread server_thread( ServerThreadFunc, boost::ref( io_service ) );

  // Sleep for 10 seconds, then shutdown the server.
  std::cout << "Stopping service in 10 seconds..." << std::endl;
  boost::this_thread::sleep( boost::posix_time::seconds( 10 ) );
  std::cout << "Stopping service now!" << std::endl;

  // Stopping the io_service is a non-blocking call.  The threads that are
  // blocked on io_service::run() will try to return as soon as possible, but
  // they may still be in the middle of a handler.  Thus, perform a join on 
  // the server thread to guarantee a block occurs.
  io_service.stop();

  std::cout << "Waiting on server thread..." << std::endl;
  server_thread.join();
  std::cout << "Done waiting on server thread." << std::endl;

  return 0;
}

在运行时,我打开了两个连接.这是输出:

While running, I opened two connections. Here is the output:

Stopping service in 10 seconds...
Waiting on connection
Doing things with socket...
Done using socket, ready for another connection.
Waiting on connection
Doing things with socket...
Done using socket, ready for another connection.
Waiting on connection
Stopping service now!
Waiting on server thread...
Server thread exiting.
Done waiting on server thread.

相关文章