我可以在 Pool.imap 调用的函数中使用多处理队列吗?
问题描述
我正在使用 python 2.7,并尝试在自己的进程中运行一些 CPU 繁重的任务.我希望能够将消息发送回父进程,以使其了解进程的当前状态.多处理队列似乎很适合这个,但我不知道如何让它工作.
I'm using python 2.7, and trying to run some CPU heavy tasks in their own processes. I would like to be able to send messages back to the parent process to keep it informed of the current status of the process. The multiprocessing Queue seems perfect for this but I can't figure out how to get it work.
所以,这是我的基本工作示例减去队列的使用.
So, this is my basic working example minus the use of a Queue.
import multiprocessing as mp
import time
def f(x):
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print str(results.next())
pool.close()
pool.join()
if __name__ == '__main__':
main()
我尝试以多种方式传递队列,但它们收到错误消息RuntimeError:队列对象只能通过继承在进程之间共享".这是我根据我找到的早期答案尝试的方法之一.(我在尝试使用 Pool.map_async 和 Pool.imap 时遇到了同样的问题)
I've tried passing the Queue in several ways, and they get the error message "RuntimeError: Queue objects should only be shared between processes through inheritance". Here is one of the ways I tried based on an earlier answer I found. (I get the same problem trying to use Pool.map_async and Pool.imap)
import multiprocessing as mp
import time
def f(args):
x = args[0]
q = args[1]
q.put(str(x))
time.sleep(0.1)
return x*x
def main():
q = mp.Queue()
pool = mp.Pool()
results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))
print str(q.get())
pool.close()
pool.join()
if __name__ == '__main__':
main()
最后,0 适应度方法(使其成为全局)不会生成任何消息,它只是锁定.
Finally, the 0 fitness approach (make it global) doesn't generate any messages, it just locks up.
import multiprocessing as mp
import time
q = mp.Queue()
def f(x):
q.put(str(x))
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print q.get()
pool.close()
pool.join()
if __name__ == '__main__':
main()
我知道它可能会直接与 multiprocessing.Process 一起使用,并且还有其他库可以实现这一点,但我不想放弃非常适合的标准库函数,直到我确定它不是只是我缺乏知识使我无法利用它们.
I'm aware that it will probably work with multiprocessing.Process directly and that there are other libraries to accomplish this, but I hate to back away from the standard library functions that are a great fit until I'm sure it's not just my lack of knowledge keeping me from being able to exploit them.
谢谢.
解决方案
诀窍是将 Queue 作为参数传递给初始化程序.似乎适用于所有 Pool 调度方法.
The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.
import multiprocessing as mp
def f(x):
f.q.put('Doing: ' + str(x))
return x*x
def f_init(q):
f.q = q
def main():
jobs = range(1,6)
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
for i in range(len(jobs)):
print q.get()
print results.next()
if __name__ == '__main__':
main()
相关文章