Python 2.7.6 中多处理的奇怪 Queue.PriorityQueue 行为

问题描述

正如您从标题中知道的那样,我正在尝试将 PriorityQueue 与多处理一起使用.更准确地说,我想制作共享 PriorityQueue,写了一些代码,但它没有按我预期的那样运行.

As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.

看代码:

import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue


def worker(queue):
    lock = Lock()
    with lock:
        for i in range(100):
            queue.put(i)

    print "worker", queue.qsize()


pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

得到以下输出:

worker 100
main 0

发生了什么以及如何以正确的方式做我想做的事?谢谢.

What's happening and how to do what I want the right way? Thank you.


解决方案

问题不在于它在这种情况下不可picklable - 如果您使用的是类 Unix 平台,则可以将队列传递给子进程而无需酸洗.(不过,在 Windows 上,我认为您会在这里遇到酸洗错误).根本问题是您没有使用进程安全队列.可以在进程之间使用的唯一队列是 Queue 对象 位于 multiprocessing 模块中.不幸的是,没有可用的 PriorityQueue 实现.但是,您可以通过使用 multiprocessing 注册 PriorityQueue 轻松创建一个.Manager 类,像这样:

The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue objects that live inside the multiprocessing module. Unfortunately, there is no PriorityQueue implementation available. However, you can easily create one by registering a PriorityQueue with a multiprocessing.Manager class, like this:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue


class MyManager(SyncManager):
    pass
MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue

def Manager():
    m = MyManager()
    m.start()
    return m

def worker(queue):
    print(queue)
    for i in range(100):
        queue.put(i)
    print "worker", queue.qsize()


m = Manager()
pr_queue = m.PriorityQueue()  # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

输出:

worker 100
main 100

请注意,如果它是标准的 multiprocessing.Queue 子类,它的性能可能不会那么好.基于 ManagerPriorityQueue 是通过创建一个实际上包含常规 PriorityQueueManager 服务器进程来实现的,然后为您的主进程和工作进程提供 Proxy 对象,这些对象使用IPC 读取/写入服务器进程中的队列.常规的 multiprocessing.Queue 只是向 Pipe 写入/读取数据.如果这是一个问题,您可以尝试通过从 multiprocessing.Queue 继承或委托来实现自己的 multiprocessing.PriorityQueue.不过,这可能不值得.

Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue subclass. The Manager-based PriorityQueue is implemented by creating a Manager server process which actually contains a regular PriorityQueue, and then providing your main and worker processes with Proxy objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queues just write/read data to/from a Pipe. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue by subclassing or delegating from multiprocessing.Queue. It may not be worth the effort, though.

相关文章