C++20线程可能永远等待STD::ATOM

2022-04-18 00:00:00 multithreading wait atomic c++ c++20

考虑以下示例代码,其中线程A推送队列上的函数,而线程B在从队列中弹出时执行这些函数:

std::atomic<uint32_t> itemCount;

//Executed by thread A
void run(std::function<void()> function) {

    if (queue.push(std::move(function))) {
        itemCount.fetch_add(1, std::memory_order_acq_rel);
        itemCount.notify_one();
    }

}


//Executed by thread B
void threadMain(){

    std::function<void()> function;

    while(true){

        if (queue.pop(function)) {

            itemCount.fetch_sub(1, std::memory_order_acq_rel);
            function();

        }else{
            itemCount.wait(0, std::memory_order_acquire);
        }

    }

}

其中queue是一个并发队列,它有一个push和一个pop函数,每个函数都返回一个bool,指示给定的操作是否成功。因此,push如果已满则返回false,如果为空则pop返回false

现在我想知道代码是否在所有情况下都是线程安全的。 让我们假设线程B的pop失败,并即将调用std::atomic<T>::wait。同时,线程A推送一个新元素,而线程B检查初始等待条件。由于itemCount尚未更改,因此失败。

之后,线程A立即递增计数器并尝试通知一个正在等待的线程(尽管线程B还不在内部等待)。线程B最终等待原子,导致线程再也不会唤醒,因为丢失了信号,尽管队列中有一个元素。这仅在新元素被推入队列时停止,并通知B继续执行。

我无法手动重现这种情况,因为时间几乎不可能正确。

这是一个严重的问题,还是不可能发生?有什么(最好是原子的)替代方案可以解决这种罕见的情况?

编辑: 我只想提一下,队列不是阻塞的,只使用原子操作。


我问这个问题的原因是我不明白怎么可能实现原子wait操作。尽管标准规定整个操作是原子的(由加载+谓词检查+等待组成),但在实现中,我使用的std::atomic<T>::wait实现大致如下:

void wait(const _TVal _Expected, const memory_order _Order = memory_order_seq_cst) const noexcept {
    _Atomic_wait_direct(this, _Atomic_reinterpret_as<long>(_Expected), _Order);
}

其中_Atomic_wait_direct定义为

template <class _Ty, class _Value_type>
void _Atomic_wait_direct(
    const _Atomic_storage<_Ty>* const _This, _Value_type _Expected_bytes, const memory_order _Order) noexcept {
    const auto _Storage_ptr = _STD addressof(_This->_Storage);
    for (;;) {
        const _Value_type _Observed_bytes = _Atomic_reinterpret_as<_Value_type>(_This->load(_Order));
        if (_Expected_bytes != _Observed_bytes) {
            return;
        }

        __std_atomic_wait_direct(_Storage_ptr, &_Expected_bytes, sizeof(_Value_type), _Atomic_wait_no_timeout);
    }
}

我们可以清楚地看到,有一个具有指定内存顺序的原子加载,以检查原子本身的状态。但是,我不明白如何将整个操作视为原子操作,因为在调用__std_atomic_wait_direct之前有一个比较。

使用条件变量,谓词本身由互斥锁保护,但原子本身在这里是如何保护的?


解决方案

标准说明如下:

[tro.races]/4对特定原子对象M的所有修改都以某种特定的总顺序发生,称为M的修改顺序。

对原子对象M的原子等待操作的调用有资格通过对M的原子通知操作的调用取消阻止,如果XYM上存在副作用,则有资格通过对M的原子通知操作的调用来取消阻止 (4.1)-观察X
的结果后,原子等待操作被阻止 (4.2)-XY之前,修改顺序为M
(4.3)-Y发生在原子通知操作调用之前。

您假设了以下方案:

  1. itemCount的当前值为零,可能来自原始初始化,也可能来自上一个fetch_sub
  2. wait加载itemCount并观察0的值。
  3. 另一个线程调用fetch_addnotify_one
  4. wait进入阻塞状态,因为它相信现在已过时的值0。

在此场景中,MitemCountX是将值设置为0的旧fetch_sub(我们假设它发生在很久以前,所有线程都可以正常看到),Y是将值更改为1的fetch_add

标准规定,wait调用(跨越步骤2和4)实际上有资格通过步骤3的notify_one调用解除阻止。实际上:

(4.1)-wait观察itemCount == 0后被阻塞(如果没有,则问题不会出现)。
(4.2)-fetch_subitemCount的修改顺序fetch_add先于fetch_sub(假设fetch_sub发生在很久以前)。
(4.3)-fetch_add发生在fetch_add之前(实际上是在notify_one之前);它们被同一线程一个接一个地调用。

因此,一致性实现必须一开始就不允许wait阻止,或者让notify_one唤醒它;它不能允许错过通知。


本讨论中唯一出现内存顺序的地方是(4.1),在观察到X";的结果后,原子等待操作被阻止。也许fetch_add实际上发生在wait之前(挂钟),但wait看不到,所以它还是被阻止了。但这对结果并不重要--或者wait观察fetch_add的结果并且根本不阻塞;或者它观察旧的fetch_subAND块的结果,但随后需要notfiy_one来唤醒它。

相关文章