将复杂字典放入返回队列时,多处理进程不加入

问题描述

给定一个非常标准的读/写多线程进程,带有一个读队列和一个写队列:

Given a pretty standard read/write multithreaded process with a read Queue and a write Queue:

8 次 worker done 被打印,但 join() 语句从未通过.但是,如果我用 `queue_out.put(1) 替换 queue_out.put(r) 它就可以了.

8 times worker done is printed, but the join() statement is never passed. But if I replace queue_out.put(r) by `queue_out.put(1) it works.

这正在融化我的大脑,可能是非常愚蠢的事情.我应该复制我的字典并将其放入返回队列吗?我在某个地方犯了一个愚蠢的错误吗?

This is melting my brain, probably something really stupid. Should I make a copy of my dictionary and put that in the return Queue? Did I make a stupid mistake somewhere?

处理函数

def reader(queue_in, queue_out, funktion):
    # Read from the queue
    while True:
        r = queue_in.get()
        if r == 'DONE':
            return
        funktion(r) # funktion adds additional keys to the dictionary
        queue_out.put(r) # <---- replacing r by 1 does let me join()
    print "worker done" # <----- this happens

填充输入队列

def writer(generator, queue):
    # Write to the queue
    for r in enumerate(generator):
        # r is a complex dictionary
        queue.put(r)    
    print "writer done"
    for _ in range(0, WORKERS):
        queue.put((-1, "DONE"))

其余的

WORKERS = 8

# init Queues
queue_in = Queue()
queue_out = Queue()

# Start processes, with input and output quests
readers = []
for _ in range(0, WORKERS):
    p = Process(target=reader, args=(queue_in, queue_out, funktion))
    p.daemon = True
    p.start()
    readers.append(p)

writer(generator, queue_in)

for p in readers:
    p.join()

print "joined"  # <---- this never happens

queue_in.close()

while not queue_out.empty():
    print queue_out.get()
queue_out.close()


解决方案

我认为我从两个来源拼凑了这个,因为我总是有同样的问题.我认为重要的是这是在 Windows 中.

I think I have pieced this together from two sources as I always have the same problem. I think the important thing is that this is in Windows.

来自文档的注释

由于 Windows 缺少 os.fork() 它有一些额外的限制:

Since Windows lacks os.fork() it has a few extra restrictions:

然后阅读这里的答案join() 用于分叉处理.

Then read the answers here that join() is for forked processed.

我一直设法以与您类似的方式运行 multiprocessing 而没有使用 join() 并且没有看到任何错误 - 我很高兴有一个反例解释为什么需要它.事实上,删除它已经解决了您的问题.

I have always managed to run multiprocessing in a similar fashion to you without using join() and not seen any errors - I'm more than happy for a counterexample to explain why it's needed. Indeed, removing it has corrected your issue.

这篇文章更深入地探讨了这些差异与操作系统之间的多处理中的子进程.我确实认为 join() 的问题,具体来说,应该在文档中更加明确.

And this article goes into more depth about the differences with child processes in multiprocessing between operating systems. I do think that the issue with join(), specifically, should be more explicit in the documentation.

相关文章