C++中如何限制运行实例的数量

2021-12-24 00:00:00 c++ boost boost-mutex

我有一个分配大量内存的 C++ 类.它通过调用第三方库来实现这一点,该库旨在在无法分配内存时崩溃,有时我的应用程序会在并行线程中创建我的类的多个实例.由于线程太多,我崩溃了.我对解决方案的最佳想法是确保永远不会有超过三个实例同时运行.(这是一个好主意吗?)我目前实现 that 的最佳想法是使用 boost 互斥锁.类似于以下伪代码的内容,

I have a c++ class that allocates a lot of memory. It does this by calling a third-party library that is designed to crash if it cannot allocate the memory, and sometimes my application creates several instances of my class in parallel threads. With too many threads I have a crash. My best idea for a solution is to make sure that there are never, say, more than three instances running at the same time. (Is this a good idea?) And my current best idea for implementing that is to use a boost mutex. Something along the lines of the following pseudo-code,

MyClass::MyClass(){
  my_thread_number = -1; //this is a class variable
  while (my_thread_number == -1)
    for (int i=0; i < MAX_PROCESSES; i++)
      if(try_lock a mutex named i){
        my_thread_number = i;
        break;
      }
  //Now I know that my thread has mutex number i and it is allowed to run
}

MyClass::~MyClass(){
    release mutex named my_thread_number
}

如您所见,我不太确定此处互斥锁的确切语法.总而言之,我的问题是

As you see, I am not quite sure of the exact syntax for mutexes here.. So summing up, my questions are

  1. 当我想通过限制线程数来解决内存错误时,我是否在正确的轨道上?
  2. 如果是,我应该使用互斥锁还是其他方式?
  3. 如果是,我的算法是否合理?
  4. 是否有一个很好的例子来说明如何将 try_lock 与 boost 互斥锁一起使用?

<小时>

我意识到我在谈论线程,而不是进程.我参与构建一个可以在 linux 和 Windows 上运行的应用程序...


I realized I am talking about threads, not processes. I am involved in building an application that can run on both linux and Windows...

推荐答案

这是实现您自己的信号量"的一种简单方法(因为我认为标准库或 boost 没有).这选择了一种合作"的方法,工人会互相等待:

Here's a simplistic way to implement your own 'semaphore' (since I don't think the standard library or boost have one). This chooses a 'cooperative' approach and workers will wait for each other:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

void the_work(int id)
{
    static int running = 0;
    std::cout << "worker " << id << " entered (" << running << " running)
";

    static mutex mx;
    static condition_variable cv;

    // synchronize here, waiting until we can begin work
    {
        unique_lock<mutex> lk(mx);
        cv.wait(lk, phoenix::cref(running) < 3);
        running += 1;
    }

    std::cout << "worker " << id << " start work
";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done
";

    // signal one other worker, if waiting
    {
        lock_guard<mutex> lk(mx);
        running -= 1;
        cv.notify_one(); 
    }
}

int main()
{
    thread_group pool;

    for (int i = 0; i < 10; ++i)
        pool.create_thread(bind(the_work, i));

    pool.join_all();
}

现在,我认为最好有一个由 n 个工人组成的专用池轮流从队列中取出他们的工作:

Now, I'd say it's probably better to have a dedicated pool of n workers taking their work from a queue in turns:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

void the_work(int id)
{
    std::cout << "worker " << id << " entered
";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work
";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done
";
}

int main()
{
    thread_pool pool; // uses 1 thread per core

    for (int i = 0; i < 10; ++i)
        pool.enqueue(bind(the_work, i));
}

附注.如果您愿意,您可以在那里使用 C++11 lambdas 而不是 boost::phoenix.

PS. You can use C++11 lambdas instead boost::phoenix there if you prefer.

相关文章