python multiprocessing .join() 死锁依赖于worker函数

问题描述

我正在使用 multiprocessing python生成库 4 Process() 对象以并行化 CPU 密集型任务.任务(灵感和代码来自这个伟大的 article) 是计算列表中每个整数的素因子.

I am using the multiprocessing python library to spawn 4 Process() objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) is to compute the prime factors for every integer in a list.

ma​​in.py:

import random
import multiprocessing
import sys

num_inputs  = 4000
num_procs   = 4
proc_inputs = num_inputs/num_procs
input_list  = [int(1000*random.random()) for i in xrange(num_inputs)]

output_queue = multiprocessing.Queue()
procs        = []
for p_i in xrange(num_procs):
  print "Process [%d]"%p_i
  proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
  print " - num inputs: [%d]"%len(proc_list)

  # Using target=worker1 HANGS on join
  p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
  # Using target=worker2 RETURNS with success
  #p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))

  procs.append(p)
  p.start()

for p in jobs:
  print "joining ", p, output_queue.qsize(), output_queue.full()
  p.join()
  print "joined  ", p, output_queue.qsize(), output_queue.full()

print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
    ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)

观察:

  • 如果每个进程的目标是函数 worker1,对于大于 4000 个元素的输入列表,主线程会卡在 .join() 上,等待产生的进程终止并且永不返回.
  • 如果每个进程的目标是函数 worker2,对于相同的输入列表,代码工作正常,主线程返回.
  • If the target for each process is the function worker1, for an input list larger than 4000 elements the main thread gets stuck on .join(), waiting for the spawned processes to terminate and never returns.
  • If the target for each process is the function worker2, for the same input list the code works just fine and the main thread returns.

这让我很困惑,因为 worker1worker2 之间的唯一区别(见下文)是前者在 Queue 而后者为每个进程插入一个列表.

This is very confusing to me, as the only difference between worker1 and worker2 (see below) is that the former inserts individual lists in the Queue whereas the latter inserts a single list of lists for each process.

为什么使用 worker1 而没有使用 worker2 目标会出现死锁?两者(或都不)不应该超出 多处理队列最大大小限制为 32767?

Why is there deadlock using worker1 and not using worker2 target? Shouldn't both (or neither) go beyond the Multiprocessing Queue maxsize limit is 32767?

worker1 与 worker2:

def worker1(proc_num, proc_list, output_queue):
    '''worker function which deadlocks'''  
    for num in proc_list:
        output_queue.put(factorize_naive(num))

def worker2(proc_num, proc_list, output_queue):
    '''worker function that works'''
    workers_stuff = []

    for num in proc_list:
        workers_stuff.append(factorize_naive(num))
    output_queue.put(workers_stuff)

<小时>

关于 SO 有很多类似的问题,但我相信这些问题的核心显然与所有问题不同.


There are a lot of similar questions on SO, but I believe the core of this questions is clearly distinct from all of them.

相关链接:

  • https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
  • python 多处理问题
  • python 多处理 - 进程在加入大型队列时挂起
  • Process.join() 和队列不起作用大数
  • Python 3 多处理在队列为空之前调用join时出现队列死锁
  • 使用多处理模块的脚本不会终止
  • 为什么 multiprocessing.Process.join() 挂起?李>
  • 何时在进程上调用 .join()?
  • Python多处理模块的.join()方法到底是什么在做什么?

解决方案

文档对此提出警告:

警告:如上所述,如果子进程已将项目放入队列(并且尚未使用 JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲项目都已刷新到管道.

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗.同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会在退出时挂起.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

虽然 Queue 似乎是无界的,但实际上,队列中的项目被缓冲在内存中,以避免进程间管道过载.在刷新这些内存缓冲区之前,进程无法正常结束.您的 worker1() 将比 worker2() 更多的项目 on 放入队列中,仅此而已.请注意,在实施诉诸于内存中缓冲之前可以排队的项目数量没有定义:它可能因操作系统和 Python 版本而异.

While a Queue appears to be unbounded, under the covers queued items are buffered in memory to avoid overloading inter-process pipes. A process cannot end normally before those memory buffers are flushed. Your worker1() puts a lot more items on the queue than your worker2(), and that's all there is to it. Note that the number of items that can queued before the implementation resorts to buffering in memory isn't defined: it can vary across OS and Python release.

正如文档建议的那样,避免这种情况的正常方法是 .get() 所有项目 off 队列 before 您尝试.join() 进程.正如您所发现的,是否有必要这样做取决于每个工作进程已将多少项放入队列中.

As the docs suggest, the normal way to avoid this is to .get() all the items off the queue before you attempt to .join() the processes. As you've discovered, whether it's necessary to do so depends in an undefined way on how many items have been put on the queue by each worker process.

相关文章