使用 std::atomic 和 std::condition_variable 同步不可靠

2022-01-07 00:00:00 multithreading c++ c++11 stl

在用 C++11 编写的分布式作业系统中,我使用以下结构实现了一个栅栏(即工作线程池之外的线程可能会要求阻塞,直到所有当前计划的作业完成):

In a distributed job system written in C++11 I have implemented a fence (i.e. a thread outside the worker thread pool may ask to block until all currently scheduled jobs are done) using the following structure:

struct fence
{
    std::atomic<size_t>                     counter;
    std::mutex                              resume_mutex;
    std::condition_variable                 resume;

    fence(size_t num_threads)
        : counter(num_threads)
    {}
};

实现围栏的代码如下所示:

The code implementing the fence looks like this:

void task_pool::fence_impl(void *arg)
{
    auto f = (fence *)arg;
    if (--f->counter == 0)      // (1)
        // we have zeroed this fence's counter, wake up everyone that waits
        f->resume.notify_all(); // (2)
    else
    {
        unique_lock<mutex> lock(f->resume_mutex);
        f->resume.wait(lock);   // (3)
    }
}

如果线程在一段时间内进入围栏,这将非常有效.但是,如果他们几乎同时尝试这样做,似乎有时会发生在原子递减 (1) 和开始等待条件变量 (3) 之间,线程产生 CPU 时间而另一个线程将计数器递减为零(1) 并触发条件.变量 (2).这导致前一个线程在 (3) 中永远等待,因为它在收到通知后开始等待它.

This works very well if threads enter the fence over a period of time. However, if they try to do it almost simultaneously, it seems to sometimes happen that between the atomic decrementation (1) and starting the wait on the conditional var (3), the thread yields CPU time and another thread decrements the counter to zero (1) and fires the cond. var (2). This results in the previous thread waiting forever in (3), because it starts waiting on it after it has already been notified.

使事情可行的一个技巧是在 (2) 之前放置 10 毫秒的睡眠,但由于显而易见的原因,这是不可接受的.

A hack to make the thing workable is to put a 10 ms sleep just before (2), but that's unacceptable for obvious reasons.

有关如何以高性能方式解决此问题的任何建议?

Any suggestions on how to fix this in a performant way?

推荐答案

您的诊断是正确的,此代码很容易以您描述的方式丢失条件通知.IE.在一个线程锁定互斥锁之后但在等待条件变量之前,另一个线程可能会调用 notify_all() 以便第一个线程错过该通知.

Your diagnose is correct, this code is prone to lose condition notifications in the way you described. I.e. after one thread locked the mutex but before waiting on the condition variable another thread may call notify_all() so that the first thread misses that notification.

一个简单的解决方法是在递减计数器和通知之前锁定互斥锁:

A simple fix is to lock the mutex before decrementing the counter and while notifying:

void task_pool::fence_impl(void *arg)
{
    auto f = static_cast<fence*>(arg);
    std::unique_lock<std::mutex> lock(f->resume_mutex);
    if (--f->counter == 0) {
        f->resume.notify_all();
    }
    else do {
        f->resume.wait(lock);
    } while(f->counter);
}

在这种情况下,计数器不需要是原子的.

In this case the counter need not be atomic.

在通知之前锁定互斥锁的额外奖励(或惩罚,取决于观点)是(来自 此处):

An added bonus (or penalty, depending on the point of view) of locking the mutex before notifying is (from here):

无论线程当前是否拥有调用 pthread_cond_wait() 或 pthread_cond_timedwait() 的线程在等待期间与条件变量关联的互斥锁,都可以调用 pthread_cond_broadcast() 或 pthread_cond_signal() 函数;但是,如果需要可预测的调度行为,则该互斥锁应由调用 pthread_cond_broadcast() 或 pthread_cond_signal() 的线程锁定.

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

关于 while 循环(来自 此处):

Regarding the while loop (from here):

可能会发生来自 pthread_cond_timedwait() 或 pthread_cond_wait() 函数的虚假唤醒.由于 pthread_cond_timedwait() 或 pthread_cond_wait() 的返回值并不暗示该谓词的任何值,因此应在此类返回时重新评估该谓词.

Spurious wakeups from the pthread_cond_timedwait() or pthread_cond_wait() functions may occur. Since the return from pthread_cond_timedwait() or pthread_cond_wait() does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.

相关文章