多处理。作为参数排队到池工作进程中止执行工作进程

问题描述

我真的很难相信我遇到了这个问题,看起来这是一个很大的错误,在python多处理模块中。无论如何,我遇到的问题是,每当我将一个多进程.Queue传递给一个多进程.Pool Worker作为参数时,池Worker永远不会执行它的代码。我甚至能够在一个非常简单的测试上重现这个错误,该测试是在pythondocs中找到的示例代码的略微修改版本。

以下是队列示例代码的原始版本:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

以下是我对队列示例代码的修改版本:

from multiprocessing import Queue, Pool

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    p.apply_async(f,args=(q,))
    print(q.get()) # prints "[42, None, 'hello']"
    p.close()
    p.join()

我所做的只是使p成为一个大小为1的进程池,而不是一个多进程.Process对象,结果是代码永远挂在print语句上,因为从来没有向队列写入任何内容!当然,我测试了它的原始形式,它运行良好。我的操作系统是WINDOWS 10,我的PYTHON版本是3.5.x,有人知道为什么会这样吗?

更新:仍然不知道为什么这个示例代码使用多进程.Process而不是多进程.Pool,但我找到了一个work around我很满意(Alex Martali的答案)。显然,您只需创建多进程的全局列表。队列并传递每个进程和索引以供使用,我将避免使用托管队列,因为它们速度较慢。感谢Guest向我显示链接。


解决方案

问题

当您调用apply_async时,它返回一个AsyncResult对象,并将工作负载分配留给一个单独的线程(另请参阅this answer)。这个线程遇到了Queue对象不能是pickled的问题,因此请求的工作不能分发(并最终执行)。我们可以通过调用AsyncResult.get

来查看
r = p.apply_async(f,args=(q,))
r.get()

这将引发RuntimeError

RuntimeError: Queue objects should only be shared between processes through inheritance

但是,此RuntimeError仅在您请求结果后才在主线程中引发,因为它实际上发生在不同的线程中(因此需要一种传输方式)。

那么当您这样做时会发生什么

p.apply_async(f,args=(q,))

是目标函数f永远不会被调用,因为它的一个参数(q)不能被Pickle。因此,q从不接收项并保持为空,因此主线程中对q.get的调用将永远停止。

解决方案

使用apply_async,您不必手动管理结果队列,但它们可以以AsyncResult对象的形式随时提供给您。因此,您可以将代码修改为仅从目标函数返回:

from multiprocessing import Queue, Pool

def f():
    return [42, None, 'hello']

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    result = p.apply_async(f)
    print(result.get())

相关文章