如何使用 multiprocessing.Queue.get 方法?
问题描述
下面的代码将三个数字放在一个队列中.然后它尝试从队列中取回号码.但它永远不会.如何从队列中获取数据?
The code below places three numbers in a queue. Then it attempts to get the numbers back from the queue. But it never does. How to get the data from the queue?
import multiprocessing
queue = multiprocessing.Queue()
for i in range(3):
queue.put(i)
while not queue.empty():
print queue.get()
解决方案
我最初在阅读@Martijn Pieters 后删除了这个答案,因为他更详细和更早地描述了为什么这不起作用".然后我意识到,OP 示例中的用例不太适合
I originally deleted this answer after I read @Martijn Pieters', since he decribed the "why this doesn't work" in more detail and earlier. Then I realized, that the use case in OP's example doesn't quite fit to the canonical sounding title of
如何使用 multiprocessing.Queue.get 方法".
"How to use multiprocessing.Queue.get method".
那不是因为有演示中不涉及子进程,但是因为在实际应用程序中,几乎没有一个队列是预先填充的,并且只在之后读取,但是读取并且写入发生在中间的等待时间之间.Martijn 展示的扩展演示代码在通常情况下不起作用,因为当排队跟不上读取速度时,while 循环会过早中断.所以这里是重新加载的答案,它能够处理通常的交错提要和读取场景:
That's not because there's no child process involved for demonstration, but because in real applications hardly ever a queue is pre-filled and only read out after, but reading and writing happens interleaved with waiting times in between. The extended demonstration code Martijn showed, wouldn't work in the usual scenarios, because the while loop would break too soon when enqueuing doesn't keep up with reading. So here is the answer reloaded, which is able to deal with the usual interleaved feeds & reads scenarios:
不要依赖 queue.empty 检查同步.
Don't rely on queue.empty checks for synchronization.
在将对象放入空队列后,队列的 empty() 方法返回 False 并且 get_nowait() 可以在不引发 queue.Empty 的情况下返回之前,可能会有一个无限小的延迟....
After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty. ...
empty()
如果队列为空,则返回 True,否则返回 False.由于多线程/多处理语义,这是不可靠的.docs
Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. docs
从队列中使用 for msg in iter(queue.get, sentinel):
到 .get()
,您可以通过传递一个哨兵值...iter(callable, sentinel)?
Either use for msg in iter(queue.get, sentinel):
to .get()
from the queue, where you break out of the loop by passing a sentinel value...iter(callable, sentinel)?
from multiprocessing import Queue
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
for msg in iter(queue.get, SENTINEL):
print(msg)
...如果您需要非阻塞解决方案,请使用 get_nowait()
并处理可能的 queue.Empty
异常.
...or use get_nowait()
and handle a possible queue.Empty
exception if you need a non-blocking solution.
from multiprocessing import Queue
from queue import Empty
import time
SENTINEL = None
if __name__ == '__main__':
queue = Queue()
for i in [*range(3), SENTINEL]:
queue.put(i)
while True:
try:
msg = queue.get_nowait()
if msg == SENTINEL:
break
print(msg)
except Empty:
# do other stuff
time.sleep(0.1)
如果只有一个进程和该进程中的一个线程正在读取队列,也可以将最后一个代码片段交换为:
In case only one process and only one thread within this process is reading the queue, it would be also possible to exchange the last code snippet with:
while True:
if not queue.empty(): # this is not an atomic operation ...
msg = queue.get() # ... thread could be interrupted in between
if msg == SENTINEL:
break
print(msg)
else:
# do other stuff
time.sleep(0.1)
由于线程可能会在检查 if not queue 之间删除 GIL.empty()
和 queue.get()
,这不适用于进程中的多线程队列读取.如果多个进程正在从队列中读取,这同样适用.
Since a thread could drop the GIL in between checking if not queue.empty()
and queue.get()
, this wouldn't be suitable for multi-threaded queue-reads in a process. The same applies if multiple processes are reading from the queue.
对于单一生产者/单一消费者的场景,使用 multiprocessing.Pipe
而不是 multiprocessing.Queue
就足够了,而且性能更高.
For single-producer / single-consumer scenarios, using a multiprocessing.Pipe
instead of multiprocessing.Queue
would be sufficient and more performant, though.
相关文章