在循环 C++ 中重用线程
我需要在 C++ 程序中并行化一些任务,并且我对并行编程完全陌生.到目前为止,我已经通过互联网搜索取得了一些进展,但现在有点卡住了.我想在循环中重用一些线程,但显然不知道如何做我想要的.
我正在从电脑上的两张ADC卡上采集数据(并行采集),然后我需要在采集下一批数据的同时对采集到的数据进行一些操作(并行处理).下面是一些伪代码来说明
//获取一些数据,等待所有数据获取完毕再继续std::thread acq1(AcquireData, boardHandle1, memoryAddress1a);std::thread acq2(AcquireData, boardHandle2, memoryAddress2a);acq1.join();acq2.join();while(用户不打扰){//获取新数据的同时处理第一批数据std::thread proc1(ProcessData,memoryAddress1a);std::thread proc2(ProcessData,memoryAddress2a);acq1(AcquireData, boardHandle1, memoryAddress1b);acq2(AcquireData, boardHandle2, memoryAddress2b);acq1.join();acq2.join();proc1.join();proc2.join();/*以这种方式进行,交替使用哪个内存地址被写入并被处理,直到用户中断程序.*/}
这是它的主要要点.循环的下一次运行将在处理b"数据时写入a"内存地址并继续交替(我可以让代码做到这一点,只是将其取出以防止混淆问题).>
无论如何,问题(我相信有些人已经知道)是我第二次尝试使用 acq1 和 acq2 时,编译器 (VS2012) 说IntelliSense:调用类类型的对象而没有适当的 operator() 或将函数转换为函数指针类型".同样,如果我再次将 std::thread 放在 acq1 和 acq2 前面,它会显示错误 C2374:'acq1':重新定义;多重初始化".
那么问题是,当线程完成上一个任务后,我可以将线程重新分配给新任务吗?在再次调用之前,我总是等待线程的前一次使用结束,但我不知道如何重新分配线程,而且由于它处于循环中,我不能每次都创建一个新线程(或者如果我可以,这似乎是浪费和不必要的,但我可能弄错了).
提前致谢
解决方案最简单的方法是使用 std::function
对象的可等待队列.像这样:
#include #include <线程>#include <mutex>#include <条件变量>#include <队列>#include <功能>#include <chrono>线程池类{民众:线程池(int 线程):shutdown_(假){//创建指定数量的线程线程_.reserve(线程);for (int i = 0; i < 线程; ++i)thread_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i));}~线程池(){{//取消阻塞所有线程并告诉它们停止std::unique_lock <std::mutex>l(锁_);关机_ =真;condVar_.notify_all();}//等待所有线程停止std::cerr <<加入线程"<<std::endl;对于(自动和线程:线程_)线程连接();}void doJob (std::function func){//在队列上放置一个作业并解除一个线程的阻塞std::unique_lock <std::mutex>l(锁_);jobs_.emplace (std::move (func));condVar_.notify_one();}受保护:void threadEntry (int i){std::function <void (void)>工作;而 (1){{std::unique_lock <std::mutex>l(锁_);while (!shutdown_ &&jobs_.empty())condVar_.wait (l);如果 (jobs_.empty()){//没有工作要做,我们正在关闭std::cerr <<线程" <<我<<终止"<<std::endl;返回;}std::cerr <<线程" <<我<<做一份工作"<<std::endl;作业 = std::move (jobs_.front());工作_.pop();}//在不持有任何锁的情况下完成工作工作 ();}}std::mutex 锁_;std::condition_variable condVar_;布尔关机_;std::queue <std::function <void(void)>工作_;std::vector 线程_;};无效傻(int n){//一个用于演示目的的愚蠢工作std::cerr <<睡觉" <<<<秒"<
I need to parallelize some tasks in a C++ program and am completely new to parallel programming. I've made some progress through internet searches so far, but am a bit stuck now. I'd like to reuse some threads in a loop, but clearly don't know how to do what I'm trying for.
I am acquiring data from two ADC cards on the computer (acquired in parallel), then I need to perform some operations on the collected data (processed in parallel) while collecting the next batch of data. Here is some pseudocode to illustrate
//Acquire some data, wait for all the data to be acquired before proceeding
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a);
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a);
acq1.join();
acq2.join();
while(user doesn't interrupt)
{
//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1(AcquireData, boardHandle1, memoryAddress1b);
acq2(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address
is written to and being processed until the user interrupts the program.*/
}
That's the main gist of it. The next run of the loop would write to the "a" memory addresses while processing the "b" data and continue to alternate (I can get the code to do that, just took it out to prevent cluttering up the problem).
Anyway, the problem (as I'm sure some people can already tell) is that the second time I try to use acq1 and acq2, the compiler (VS2012) says "IntelliSense: call of an object of a class type without appropriate operator() or conversion functions to pointer-to-function type". Likewise, if I put std::thread in front of acq1 and acq2 again, it says " error C2374: 'acq1' : redefinition; multiple initialization".
So the question is, can I reassign threads to a new task when they have completed their previous task? I always wait for the previous use of the thread to end before calling it again, but I don't know how to reassign the thread, and since it's in a loop, I can't make a new thread each time (or if I could, that seems wasteful and unnecessary, but I could be mistaken).
Thanks in advance
解决方案The easiest way is to use a waitable queue of std::function
objects. Like this:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>
class ThreadPool
{
public:
ThreadPool (int threads) : shutdown_ (false)
{
// Create the specified number of threads
threads_.reserve (threads);
for (int i = 0; i < threads; ++i)
threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i));
}
~ThreadPool ()
{
{
// Unblock any threads and tell them to stop
std::unique_lock <std::mutex> l (lock_);
shutdown_ = true;
condVar_.notify_all();
}
// Wait for all threads to stop
std::cerr << "Joining threads" << std::endl;
for (auto& thread : threads_)
thread.join();
}
void doJob (std::function <void (void)> func)
{
// Place a job on the queu and unblock a thread
std::unique_lock <std::mutex> l (lock_);
jobs_.emplace (std::move (func));
condVar_.notify_one();
}
protected:
void threadEntry (int i)
{
std::function <void (void)> job;
while (1)
{
{
std::unique_lock <std::mutex> l (lock_);
while (! shutdown_ && jobs_.empty())
condVar_.wait (l);
if (jobs_.empty ())
{
// No jobs to do and we are shutting down
std::cerr << "Thread " << i << " terminates" << std::endl;
return;
}
std::cerr << "Thread " << i << " does a job" << std::endl;
job = std::move (jobs_.front ());
jobs_.pop();
}
// Do the job without holding any locks
job ();
}
}
std::mutex lock_;
std::condition_variable condVar_;
bool shutdown_;
std::queue <std::function <void (void)>> jobs_;
std::vector <std::thread> threads_;
};
void silly (int n)
{
// A silly job for demonstration purposes
std::cerr << "Sleeping for " << n << " seconds" << std::endl;
std::this_thread::sleep_for (std::chrono::seconds (n));
}
int main()
{
// Create two threads
ThreadPool p (2);
// Assign them 4 jobs
p.doJob (std::bind (silly, 1));
p.doJob (std::bind (silly, 2));
p.doJob (std::bind (silly, 3));
p.doJob (std::bind (silly, 4));
}
相关文章