安全地取消 boost asio 截止时间计时器

2021-12-24 00:00:00 multithreading timer c++ boost boost-asio

我正在尝试安全取消 boost::asio::basic_waitable_timer.

I'm trying to cancel a boost::asio::basic_waitable_timer<std::chrono::steady_clock> safely.

根据这个答案,这段代码应该做那个工作:

According to this answer, this code should do that work:

timer.get_io_service().post([&]{timer.cancel();})

恐怕它对我不起作用.
我做错了什么吗?
这是我的代码:

I'm afraid it doesn't work for me.
Am I doing something wrong?
This is my code:

#include <iostream>
#include "boost/asio.hpp"
#include <chrono>
#include <thread>
#include <random>

boost::asio::io_service io_service;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer(io_service);
std::atomic<bool> started;

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout
";
        timer.expires_from_now(std::chrono::milliseconds(10));
        timer.async_wait(&handle_timeout);
    } else if (ec == boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout aborted
";
    } else {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout another error
";
    }
}

int main() {

    std::cout << "tid: " << std::this_thread::get_id() << ", Hello, World!" << std::endl;
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 100);

    for (auto i = 0; i < 1000; i++) {

        started = false;
        std::thread t([&](){

            timer.expires_from_now(std::chrono::milliseconds(0));
            timer.async_wait(&handle_timeout);

            io_service.run();
        });

        while (!started) {};
        auto sleep = dis(gen);
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", sleeps for " << sleep << " [ms]" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
        timer.get_io_service().post([](){
            std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post
";
            timer.cancel();
        });
//      timer.cancel();
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", waiting for thread to join()" << std::endl;
        t.join();
        io_service.reset();
    }

    return 0;
}

这是输出:

...
时间:140737335076608,handle_timeout
时间:140737335076608,handle_timeout
tid: 140737353967488, i: 2, 等待线程join()
tid: 140737335076608, 后期取消
tid:140737335076608,handle_timeout 中止
tid: 140737353967488, i: 3, 睡眠 21 [ms]
时间:140737335076608,handle_timeout
tid: 140737353967488, i: 3, 等待线程加入()
时间:140737335076608,handle_timeout
tid: 140737335076608, 后期取消
时间:140737335076608,handle_timeout
时间:140737335076608,handle_timeout
时间:140737335076608,handle_timeout
时间:140737335076608,handle_timeout
时间:140737335076608,handle_timeout
...
永远继续......

...
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737353967488, i: 2, waiting for thread to join()
tid: 140737335076608, cancelling in post
tid: 140737335076608, handle_timeout aborted
tid: 140737353967488, i: 3, sleeps for 21 [ms]
tid: 140737335076608, handle_timeout
tid: 140737353967488, i: 3, waiting for thread to join()
tid: 140737335076608, handle_timeout
tid: 140737335076608, cancelling in post
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
...
continue forever...

如您所见,timer.cancel() 正在从适当的线程中调用:

As you can see, the timer.cancel() is being called from the appropriate thread:

tid: 140737335076608,在帖子中取消

tid: 140737335076608, cancelling in post

但是没有

tid:140737335076608,handle_timeout 中止

tid: 140737335076608, handle_timeout aborted

之后.

Main 永远等待.

推荐答案

取消是安全的.

它只是不健壮.您没有考虑计时器未挂起的情况.然后,您可以取消它一次,但是一旦调用完成处理程序,它就会开始一个新的异步等待.

It's just not robust. You didn't account for the case when the timer wasn't pending. You cancel it once, then, but it will just start a new async wait once the completion handler is invoked.

接下来是我如何追踪问题的详细步骤.

What follows is my detailed steps on how I traced the issue.

总结 TL;DR

SUMMARY TL;DR

取消时间只会取消运行中的异步操作.

Cancelling a time only cancels asynchronous operations in flight.

如果要关闭异步调用链,则必须为此使用其他逻辑.下面给出一个例子.

If you want to shutdown an asynchronous call chain, you'll have to use additional logic for that. An example is given below.

处理程序跟踪

启用

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1

这会产生可以使用 boost/libs/asio/tools/handlerviz.pl 可视化的输出:

This produces output that can be visualized with boost/libs/asio/tools/handlerviz.pl:

如您所见,当取消发生时,async_wait 正在运行.

As you can see, the async_wait is in-flight when the cancellation happens.

(被截断,因为它会无限运行)

(truncated because it would run infinitely)

注意完成处理程序如何看到 cc=system:0,而不是 cc=system:125(对于 operation_aborted).这是发布的取消实际上并未采取"这一事实的症状.唯一合乎逻辑的解释(图中不可见)是在调用取消之前计时器已经到期.

Note how the completion handler sees cc=system:0, not cc=system:125 (for operation_aborted). This is a symptom of the fact that the posted cancel did not actually "take". The only logical explanation (not visible in the diagram) is that the timer had already expired before the cancel gets invoked.

让我们比较原始痕迹1

1 去除噪声差异

所以,我们领先了.我们能检测到吗?

So, we have a lead. Can we detect it?

    timer.get_io_service().post([](){
        std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post
";
        if (timer.expires_from_now() >= std::chrono::steady_clock::duration(0)) {
            timer.cancel();
        } else {
            std::cout << "PANIC
";
            timer.cancel();
        }
    });

打印:

tid: 140113177143232, i: 0, waiting for thread to join()
tid: 140113177143232, i: 1, waiting for thread to join()
tid: 140113177143232, i: 2, waiting for thread to join()
tid: 140113177143232, i: 3, waiting for thread to join()
tid: 140113177143232, i: 4, waiting for thread to join()
tid: 140113177143232, i: 5, waiting for thread to join()
tid: 140113177143232, i: 6, waiting for thread to join()
tid: 140113177143232, i: 7, waiting for thread to join()
tid: 140113177143232, i: 8, waiting for thread to join()
tid: 140113177143232, i: 9, waiting for thread to join()
tid: 140113177143232, i: 10, waiting for thread to join()
tid: 140113177143232, i: 11, waiting for thread to join()
tid: 140113177143232, i: 12, waiting for thread to join()
tid: 140113177143232, i: 13, waiting for thread to join()
tid: 140113177143232, i: 14, waiting for thread to join()
tid: 140113177143232, i: 15, waiting for thread to join()
tid: 140113177143232, i: 16, waiting for thread to join()
tid: 140113177143232, i: 17, waiting for thread to join()
tid: 140113177143232, i: 18, waiting for thread to join()
tid: 140113177143232, i: 19, waiting for thread to join()
tid: 140113177143232, i: 20, waiting for thread to join()
tid: 140113177143232, i: 21, waiting for thread to join()
tid: 140113177143232, i: 22, waiting for thread to join()
tid: 140113177143232, i: 23, waiting for thread to join()
tid: 140113177143232, i: 24, waiting for thread to join()
tid: 140113177143232, i: 25, waiting for thread to join()
tid: 140113177143232, i: 26, waiting for thread to join()
PANIC

我们能否以另一种更清晰的方式传达超级取消"?我们有……当然,只有 timer 对象可以使用:

Could we communicate the "super-cancellation" in another, clearer way? We have ... just the timer object to work with, of course:

timer 对象没有很多可以使用的属性.没有 close() 或类似的东西,比如在套接字上,可用于将计时器置于某种无效状态.

The timer object doesn't have a lot of properties to work with. There's no close() or similar, like on a socket, that can be used to put the timer in some kind of invalid state.

但是,有到期时间点,我们可以使用一个特殊的域为我们的应用程序发出无效"信号的值:

However, there's the expiry timepoint, and we can use a special domain value to signal "invalid" for our application:

timer.get_io_service().post([](){
    std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post
";
    // also cancels:
    timer.expires_at(Timer::clock_type::time_point::min());
});

这个特殊值"在完成处理程序中很容易处理:

This "special value" is easy to handle in the completion handler:

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        if (timer.expires_at() != Timer::time_point::min()) {
            timer.expires_from_now(std::chrono::milliseconds(10));
            timer.async_wait(&handle_timeout);
        } else {
            std::cerr << "handle_timeout: detected shutdown
";
        }
    } 
    else if (ec != boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout error " << ec.message() << "
";
    }
}

相关文章