Boost group_threads 最大并行线程数

2021-12-24 00:00:00 multithreading c++ boost

我想在我的程序中使用最大线程数应用 boost group_thread.例如

i want to apply boost group_thread in my program with a maximal number of Threads. For example

int maxNumberOfThreads
boost::thread_group group;
 for (int i = 0; i < N; ++i)
      //create new if group.size() is smaller then maximal number of threads
      group.create_thread(Worker);
 group.join_all();

有人知道我如何实现这一点吗?

Someone has an idea how i can realize this ?

因为当我启动N个线程时效率会非常低.

Because it will be very inefficient when i start N numbers of thread.

感谢您的帮助

推荐答案

你似乎想要一个线程池.

What you seem to want is a thread pool.

您可以使用 boost::thread::hardware_concurrency() 来确定特定系统上可用的(逻辑)内核数量.

You can use boost::thread::hardware_concurrency() to determine the number of (logical) cores available on your particular system.

这是我上周滚动的答案:

Here's one I rolled for an answer last week:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

boost::atomic_size_t counter(0ul);

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (optional<job_t> job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(job);

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          job_t job = _queue.front();
          _queue.pop_front();

          return job;
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

一个典型的使用方法也在那个答案中:

A typical way to use that is also in that answer:

static const size_t bignumber = 1 << 20;

class myClass 
{
    thread_pool pool; // uses 1 thread per core

  public:
    void launch_jobs()
    {
        std::cout << "enqueuing jobs... " << std::flush;
        for(size_t i=0; i<bignumber; ++i)
        {
            for(int j=0; j<2; ++j) {
                pool.enqueue(bind(&myClass::myFunction, this, j, i));
            }     
        }
        std::cout << "done
";
    }

  private:
    void myFunction(int i, int j)
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        counter += 1;
    }
};

int main()
{
    myClass instance;
    instance.launch_jobs();

    size_t last = 0;
    while (counter < (2*bignumber))
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
        if ((counter >> 4u) > last)
        {
            std::cout << "Progress: " << counter << "/" << (bignumber*2) << "
";
            last = counter >> 4u;
        }
    }
}

对于那个问题,在另一个答案的评论中,我还发布了一个基于无锁作业队列实现的等效解决方案:

For bonus, at that question, in the comments to another answer, I also posted an equivalent solution based on a lock-free job queue implementation:

  • 提升线程抛出异常"thread_resource_error: 资源暂时不可用"

相关文章