如何使用`asio::ip::tcp::iostream`避免数据竞争?
在使用两个线程通过 asio::ip::tcp::iostream
发送和接收时,如何避免数据竞争?
How do I avoid a data race when using two threads to send and receive over an asio::ip::tcp::iostream
?
我正在编写一个使用 asio::ip::tcp::iostream
进行输入和输出的程序.该程序通过端口 5555 接受来自(远程)用户的命令,并通过相同的 TCP 连接向用户发送消息.因为这些事件(从用户接收的命令或发送给用户的消息)是异步发生的,所以我有单独的发送和接收线程.
I am writing a program that uses an asio::ip::tcp::iostream
for input and output. The program accepts commands from the (remote) user over port 5555 and sends messages over that same TCP connection to the user. Because these events (commands received from the user or messages sent to the user) occur asynchronously, I have separate transmit and receive threads.
在这个玩具版本中,命令是一"、二"和退出".当然退出"退出程序.其他命令什么都不做,任何无法识别的命令都会导致服务器关闭 TCP 连接.
In this toy version, the commands are "one", "two" and "quit". Of course "quit" quits the program. The other commands do nothing, and any unrecognized command causes the server to close the TCP connection.
传输的消息是简单的序列号消息,每秒发送一次.
The transmitted messages are simple serial-numbered messages that are are sent once per second.
在这个玩具版本和我尝试编写的真实代码中,发送和接收过程都使用阻塞 IO,因此似乎不是使用 std 的好方法:互斥
或其他同步机制.(在我的尝试中,一个进程会获取互斥锁然后阻塞,这对此不起作用.)
In both this toy version and the real code I'm trying to write, the transmit and receive processes are both using blocking IO, so there doesn't appear to be a good way to use a std::mutex
or other synchronization mechanism. (In my attempts, one process would grab the mutex and then block, which isn't going to work for this.)
为了构建和测试这个,我在 64 位 Linux 机器上使用 gcc 版本 7.2.1 和 valgrind 3.13.构建:
To build and test this, I'm using gcc version 7.2.1 and valgrind 3.13 on a 64-bit Linux machine. Build:
g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread
为了测试,我使用以下命令运行服务器:
To test, I run the server with this command:
valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent
然后我在另一个窗口中使用 telnet 127.0.0.1 5555
创建到服务器的连接.helgrind
正确指出的是存在数据竞争,因为 runTx
和 runRx
都试图异步访问同一个流:
Then I use telnet 127.0.0.1 5555
in another window to create a connection to the server. What helgrind
correctly points out is that there is a data race because both runTx
and runRx
are trying to access the same stream asynchronously:
==16188== 线程 #1 在 0x1FFEFFF1CC 处读取大小为 1 期间可能发生数据争用
==16188== Possible data race during read of size 1 at 0x1FFEFFF1CC by thread #1
==16188== 持有的锁:无
==16188== Locks held: none
...省略了更多行
concurrent.cpp
#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream *in, std::ostream *out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream *in);
int runRx(std::ostream *out);
bool want_quit;
bool want_reset;
};
int Console::runTx(std::istream *in) {
static const std::array<std::string, 3> cmds{
"quit", "one", "two",
};
std::string command;
while (!want_quit && !want_reset && *in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]
";
} else {
std::cout << command << '
';
}
}
return 0;
}
int Console::runRx(std::ostream *out) {
for (int i=0; !(want_reset || want_quit); ++i) {
(*out) << "This is message number " << i << '
';
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
out->flush();
}
return 0;
}
int Console::run(std::istream *in, std::ostream *out) {
want_reset = false;
std::thread t1{&Console::runRx, this, out};
int status = runTx(in);
t1.join();
return status;
}
int main()
{
Console con;
asio::io_service ios;
// IPv4 address, port 5555
asio::ip::tcp::acceptor acceptor(ios,
asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
con.run(&stream, &stream);
if (con.wantReset()) {
std::cout << "resetting
";
}
}
}
推荐答案
是的,您正在共享作为流基础的套接字,没有同步
Yeah, you're sharing the socket that underlies the stream, without synchronization
旁注,与布尔标志相同,可以通过更改轻松修复":
Sidenote, same with the boolean flags, which can easily be "fixed" by changing:
std::atomic_bool want_quit;
std::atomic_bool want_reset;
如何解决
老实说,我不认为有一个好的解决方案.您自己说过:操作是异步的,所以如果您尝试同步执行它们会遇到麻烦.
How To Solve
To be honest, I don't think there is a good solution. You said it yourself: the operations are asynchronous, so you'll be in trouble if you try to do them synchronously.
你可以试着想想黑客.如果我们基于相同的底层套接字(文件描述符)创建一个单独的流对象会怎样.这不会非常容易,因为这样的流不是 Asio 的一部分.
You could try to think of hacks. What if we created a separate stream object based on the same underlying socket (filedescriptor). It's not going to be very easy as such a stream is not part of Asio.
但我们可以使用 Boost Iostreams 来破解:
But we could hack one up using Boost Iostreams:
#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
// .... later:
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
con.run(stream, hack_ostream);
确实可以在没有竞争的情况下运行(在同一个套接字上同时读取和写入 没问题,只要你不共享包装它们的非线程安全的 Asio 对象.
Indeed this runs without the race (simultaneous reads and writes on the same socket are fine, as long as you don't share the non-threadsafe Asio object(s) wrapping them).
不要那样做.这是一团糟.你把事情复杂化了,显然是为了避免使用异步代码.我会咬紧牙关.
Don't do that. It's a kludge. You're complicating things, apparently in an attempt to avoid using asynchronous code. I'd bite the bullet.
从服务逻辑中分离出 IO 机制并没有太多工作.您最终将摆脱随机限制(您可以考虑与多个客户端打交道,您可以在没有任何线程 等情况下进行处理).
It's not too much work to factor the IO mechanics out from the service logic. You'll end up being free from random limitations (you could consider dealing with multiple clients, you could do without any threading at all etc.).
如果您想了解一些中间立场,请查看堆栈式协程(http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html)
If you would like to learn about some middle ground, look at stackful coroutines (http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html)
仅供参考
注意我进行了重构以消除对指针的需要.您不会转让所有权,因此参考即可.如果您不知道如何将引用传递给 bind
/std::thread
构造函数,诀窍在于 std::ref代码>你会看到.
Note I refactored to remove the need for pointers. You're not transferring ownership, so a reference will do. In case you didn't know how to pass the reference to a
bind
/std::thread
constructor, the trick is in thestd::ref
you'll see.
[对于压力测试,我大大减少了延迟.]
[For stress testing I have greatly reduced the delays.]
生活在 Coliru
#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream &in, std::ostream &out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream &in);
int runRx(std::ostream &out);
std::atomic_bool want_quit;
std::atomic_bool want_reset;
};
int Console::runTx(std::istream &in) {
static const std::array<std::string, 3> cmds{
{"quit", "one", "two"},
};
std::string command;
while (!want_quit && !want_reset && in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]
";
} else {
std::cout << command << '
';
}
}
return 0;
}
int Console::runRx(std::ostream &out) {
for (int i=0; !(want_reset || want_quit); ++i) {
out << "This is message number " << i << '
';
std::this_thread::sleep_for(std::chrono::milliseconds(1));
out.flush();
}
return 0;
}
int Console::run(std::istream &in, std::ostream &out) {
want_reset = false;
std::thread t1{&Console::runRx, this, std::ref(out)};
int status = runTx(in);
t1.join();
return status;
}
#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
int main()
{
Console con;
boost::asio::io_service ios;
// IPv4 address, port 5555
boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
boost::asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
{
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);
con.run(stream, hack_ostream);
}
if (con.wantReset()) {
std::cout << "resetting
";
}
}
}
测试:
netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2
和
commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)
无限期运行,偶尔会重置连接(当命令三"已发送时).
runs indefinitely, occasionally resetting the connection (when command "three" has been sent).
相关文章