boost::asio::socket 线程安全

2021-12-24 00:00:00 c++ boost boost-asio

(这是我原来问题的简化版)

( This is a simplified version of my original question )

我有几个线程可以写入 boost asio 套接字.这似乎工作得很好,没有问题.

I have several threads that write to a boost asio socket. This seems to work very well, with no problems.

文档说共享套接字不是线程安全的( 这里,在底部)所以我想知道我是否应该用互斥锁或其他东西保护套接字.

The documentation says a shared socket is not thread safe( here, way down at the bottom ) so I am wondering if I should protect the socket with mutex, or something.

这个问题 坚持认为保护是必要的,但没有给出如何保护的建议.

This question insists that protection is necessary, but gives no advice on how to do so.

我原来的问题的所有答案都坚持认为我在做的事情很危险,并且大多数人都敦促我用 async_writes 甚至更复杂的东西替换我的写入.但是,我不愿意这样做,因为它会使已经在运行的代码变得复杂,而且没有一个回答者让我相信他们知道他们在说什么 - 他们似乎和我一样阅读了相同的文档并且正在猜测,就像我一样

All the answers to my original question also insisted that what I was doing dangerous, and most urged me to replace my writes with async_writes or even more complicated things. However, I am reluctant to do this, since it would complicate code that is already working and none of the answerers convinced me they knew what they ware talking about - they seemed to have read the same documentation as I and were guessing, just as I was.

因此,我编写了一个简单的程序来测试从两个线程写入共享套接字的压力.

So, I wrote a simple program to stress test writing to a shared socket from two threads.

这里是服务器,它简单地写出它从客户端接收到的任何内容

Here is the server, which simply writes out whatever it receives from the client

int main()
{
    boost::asio::io_service io_service;

    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));

    tcp::socket socket(io_service);
    acceptor.accept(socket);

    for (;;)
    {
        char mybuffer[1256];
        int len = socket.read_some(boost::asio::buffer(mybuffer,1256));
        mybuffer[len] = '';
        std::cout << mybuffer;
        std::cout.flush();

    }

  return 0;
}

这是客户端,它创建了两个线程,尽可能快地写入共享套接字

Here is the client, which creates two threads that write to a shared socket as fast as they can

boost::asio::ip::tcp::socket * psocket;

void speaker1()
{
    string msg("speaker1: hello, server, how are you running?
");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}
void speaker2()
{
    string msg("speaker2: hello, server, how are you running?
");
    for( int k = 0; k < 1000; k++ ) {
        boost::asio::write(
            *psocket,boost::asio::buffer(msg,msg.length()));
    }

}

int main(int argc, char* argv[])
{

    boost::asio::io_service io_service;

  // connect to server

    tcp::resolver resolver(io_service);
    tcp::resolver::query query("localhost", "3001");
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::resolver::iterator end;
    psocket = new tcp::socket(io_service);
    boost::system::error_code error = boost::asio::error::host_not_found;
    while (error && endpoint_iterator != end)
    {
        psocket->close();
        psocket->connect(*endpoint_iterator++, error);
    }


    boost::thread t1( speaker1 );
    boost::thread t2( speaker2 );

    Sleep(50000);

}

这有效!完美,据我所知.客户端不会崩溃.消息到达服务器时没有出现乱码.它们通常交替到达,每个线程一个.有时一个线程在另一个线程之前收到两三个消息,但只要没有乱码并且所有消息都到达,我认为这不是问题.

This works! Perfectly, as far as I can tell. The client does not crash. The messages arrive at the server without garbles. They usually arrive alternately, one from each thread. Sometimes one thread get two or three messages in before the other, but I do not think this is a problem so long as there are no garbles and all the messages arrive.

我的结论:套接字在某些理论上可能不是线程安全的,但它很难让它失败,所以我不会担心.

My conclusion: the socket may not be thread safe in some theoretical sense, but it is so hard to make it fail that I am not going to worry about it.

推荐答案

在重新研究 async_write 的代码后,我现在确信任何写操作都是线程安全的,当且仅当数据包大小小于

After restudying the code for async_write I am now convinced that any write operation is thread safe if and only if the packet size is smaller than

default_max_transfer_size = 65536;

会发生什么,一旦 async_write 被调用,async_write_some 就会在同一个线程中被调用.池中调用某种形式的 io_service::run 的任何线程将继续为该写操作调用 async_write_some 直到它完成.

What happens is that as soon as an async_write is called an async_write_some is called in the same thread. Any threads in the pool calling some form of io_service::run will keep on calling async_write_some for that write operation until it completes.

如果必须多次调用这些 async_write_some 调用(数据包大于 65536),它们可以并且将会交错.

These async_write_some calls can and will interleave if it has to be called more than once (the packets are larger than 65536).

ASIO 不会像您期望的那样对套接字的写入进行排队,一个接一个地完成.为了确保线程和交错安全写入,请考虑以下代码:

ASIO does not queue writes to a socket as you would expect, one finishing after the other. In order to ensure both thread and interleave safe writes consider the following piece of code:

    void my_connection::async_serialized_write(
            boost::shared_ptr<transmission> outpacket) {
        m_tx_mutex.lock();
        bool in_progress = !m_pending_transmissions.empty();
        m_pending_transmissions.push(outpacket);
        if (!in_progress) {
            if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
                boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                        boost::asio::transfer_all(),
            boost::bind(&my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                                       boost::asio::placeholders::bytes_transferred));
            } else { // Send single buffer
                boost::asio::async_write(m_socket,
                                    boost::asio::buffer(
                                           m_pending_transmissions.front()->buffer_references.front(),                          m_pending_transmissions.front()->num_bytes_left),
                boost::asio::transfer_all(),
                boost::bind(
                        &my_connection::handle_async_serialized_write,
                        shared_from_this(),
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
            }
        }
        m_tx_mutex.unlock();
    }

    void my_connection::handle_async_serialized_write(
    const boost::system::error_code& e, size_t bytes_transferred) {
        if (!e) {
            boost::shared_ptr<transmission> transmission;
            m_tx_mutex.lock();
            transmission = m_pending_transmissions.front();
            m_pending_transmissions.pop();
            if (!m_pending_transmissions.empty()) {
                if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {
            boost::asio::async_write(m_socket,
                    m_pending_transmissions.front()->scatter_buffers,
                    boost::asio::transfer_exactly(
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::bind(
                            &chreosis_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                } else { // Send single buffer
                    boost::asio::async_write(m_socket,
                    boost::asio::buffer(
                            m_pending_transmissions.front()->buffer_references.front(),
                            m_pending_transmissions.front()->num_bytes_left),
                    boost::asio::transfer_all(),
                    boost::bind(
                            &my_connection::handle_async_serialized_write,
                            shared_from_this(),
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred));
                }
            }
            m_tx_mutex.unlock();
            transmission->handler(e, bytes_transferred, transmission);
        } else {
            MYLOG_ERROR(
            m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());
            stop(connection_stop_reasons::stop_async_handler_error);
        }
    }

这基本上是一个队列,一次发送一个数据包.async_write 仅在第一次写入成功后调用,然后为第一次写入调用原始处理程序.

This basically makes a queue for sending one packet at a time. async_write is called only after the first write succeeds which then calls the original handler for the first write.

如果 asio 使每个套接字/流自动写入队列会更容易.

It would have been easier if asio made write queues automatic per socket/stream.

相关文章