Python multiprocessing.Queue 与 multiprocessing.manager().Queue()
问题描述
我有这样一个简单的任务:
I have a simple task like that:
def worker(queue):
while True:
try:
_ = queue.get_nowait()
except Queue.Empty:
break
if __name__ == '__main__':
manager = multiprocessing.Manager()
# queue = multiprocessing.Queue()
queue = manager.Queue()
for i in range(5):
queue.put(i)
processes = []
for i in range(2):
proc = multiprocessing.Process(target=worker, args=(queue,))
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
似乎 multiprocessing.Queue 可以完成我需要的所有工作,但另一方面,我看到许多 manager().Queue() 的示例并且无法理解我真正需要什么.看起来 Manager().Queue() 使用某种代理对象,但我不明白这些目的,因为 multiprocessing.Queue() 在没有任何代理对象的情况下做同样的工作.
It seems that multiprocessing.Queue can do all work that i needed, but on the other hand I see many examples of manager().Queue() and can't understand what I really need. Looks like Manager().Queue() use some sort of proxy objects, but I doesn't understand those purpose, because multiprocessing.Queue() do the same work without any proxy objects.
所以,我的问题是:
1) multiprocessing.Queue 和 multiprocessing.manager().Queue() 返回的对象有什么区别?
1) What really difference between multiprocessing.Queue and object returned by multiprocessing.manager().Queue()?
2) 我需要使用什么?
2) What do I need to use?
解决方案
虽然我对这个主题的理解有限,但从我所做的我可以看出 multiprocessing.Queue() 和 multiprocessing.Manager() 之间的一个主要区别.Queue():
Though my understanding is limited about this subject, from what I did I can tell there is one main difference between multiprocessing.Queue() and multiprocessing.Manager().Queue():
- multiprocessing.Queue() 是一个对象,而 multiprocessing.Manager().Queue() 是一个地址(代理),指向由 multiprocessing.Manager() 对象管理的共享队列.
- 因此您不能将普通的 multiprocessing.Queue() 对象传递给 Pool 方法,因为它不能被腌制.
- 此外,python 文档告诉我们在使用多处理时要特别注意.Queue() 因为它可能会产生不良影响
- multiprocessing.Queue() is an object whereas multiprocessing.Manager().Queue() is an address (proxy) pointing to shared queue managed by the multiprocessing.Manager() object.
- therefore you can't pass normal multiprocessing.Queue() objects to Pool methods, because it can't be pickled.
- Moreover the python doc tells us to pay particular attention when using multiprocessing.Queue() because it can have undesired effects
注意当一个对象被放入队列时,该对象被腌制并且后台线程稍后将腌制数据刷新到底层管道.这会产生一些令人惊讶的后果,但不会造成任何实际困难——如果它们真的困扰您,那么您可以改用由经理创建的队列.将对象放入空队列后,队列的 empty() 方法返回 False 和 get_nowait() 可以在不引发 Queue.Empty 的情况下返回之前可能会有一个无限小的延迟.如果多个进程正在对对象进行排队,则对象可能会在另一端乱序接收.但是,由同一进程排队的对象将始终按预期顺序排列.
Note When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.
警告 如上所述,如果子进程已将项目放入队列(并且它没有使用 JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道.这意味着如果您尝试加入该进程,您可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗.类似地,如果子进程是非守护进程,则父进程在尝试加入其所有非守护子进程时可能会挂起退出.请注意,使用管理器创建的队列不存在此问题.
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue.
通过将队列设置为全局变量并在初始化时为所有进程设置它,可以将 multiprocessing.Queue() 与 Pool 一起使用:
There is a workaround to use multiprocessing.Queue() with Pool by setting the queue as a global variable and setting it for all processes at initialization :
queue = multiprocessing.Queue()
def initialize_shared(q):
global queue
queue=q
pool= Pool(nb_process,initializer=initialize_shared, initargs(queue,))
将创建具有正确共享队列的池进程,但我们可以争辩说 multiprocessing.Queue() 对象不是为此用途而创建的.
will create pool processes with correctly shared queues but we can argue that the multiprocessing.Queue() objects were not created for this use.
另一方面,manager.Queue() 可以在池子进程之间共享,方法是将它作为函数的普通参数传递.
On the other hand the manager.Queue() can be shared between pool subprocesses by passing it as normal argument of a function.
在我看来,使用 multiprocessing.Manager().Queue() 在任何情况下都很好,而且麻烦更少.使用经理可能会有一些缺点,但我不知道.
In my opinion, using multiprocessing.Manager().Queue() is fine in every case and less troublesome. There might be some drawbacks using a manager but I'm not aware of it.
相关文章