为什么 std::queue 不是线程安全的?
The topic says it. I don't understand why the std::queue (or in general: any queue) is not thread-safe by its nature, when there is no iterator involved as with other datastructures.
According to the common rule that
- at least one thread is writing to ...
- and another thread is reading from a shared resource
I should have gotten a conflict in the following example code:
#include "stdafx.h"
#include <queue>
#include <thread>
#include <iostream>
struct response
{
static int & getCount()
{
static int theCount = 0;
return theCount;
}
int id;
};
std::queue<response> queue;
// generate 100 response objects and push them into the queue
void produce()
{
for (int i = 0; i < 100; i++)
{
response r;
r.id = response::getCount()++;
queue.push(r);
std::cout << "produced: " << r.id << std::endl;
}
}
// get the 100 first responses from the queue
void consume()
{
int consumedCounter = 0;
for (;;)
{
if (!queue.empty())
{
std::cout << "consumed: " << queue.front().id << std::endl;
queue.pop();
consumedCounter++;
}
if (consumedCounter == 100)
break;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
std::thread t1(produce);
std::thread t2(consume);
t1.join();
t2.join();
return 0;
}
Everything seems to be working fine: - No integrity violated / data corrupted - The order of the elements in which the consumer gets them are correct (0<1<2<3<4...), of course the order in which the prod. and cons. are printing is random as there is no signaling involved.
解决方案Imagine you check for !queue.empty()
, enter the next block and before getting to access queue.first()
, another thread would remove (pop) the one and only element, so you query an empty queue.
Using a synchronized queue like the following
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class SharedQueue
{
public:
SharedQueue();
~SharedQueue();
T& front();
void pop_front();
void push_back(const T& item);
void push_back(T&& item);
int size();
bool empty();
private:
std::deque<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};
template <typename T>
SharedQueue<T>::SharedQueue(){}
template <typename T>
SharedQueue<T>::~SharedQueue(){}
template <typename T>
T& SharedQueue<T>::front()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
return queue_.front();
}
template <typename T>
void SharedQueue<T>::pop_front()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
queue_.pop_front();
}
template <typename T>
void SharedQueue<T>::push_back(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push_back(item);
mlock.unlock(); // unlock before notificiation to minimize mutex con
cond_.notify_one(); // notify one waiting thread
}
template <typename T>
void SharedQueue<T>::push_back(T&& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push_back(std::move(item));
mlock.unlock(); // unlock before notificiation to minimize mutex con
cond_.notify_one(); // notify one waiting thread
}
template <typename T>
int SharedQueue<T>::size()
{
std::unique_lock<std::mutex> mlock(mutex_);
int size = queue_.size();
mlock.unlock();
return size;
}
The call to front()
waits until it has an element and locks the underlying queue so only one thread may access it at a time.
相关文章