使用多处理队列、池和锁定的简单示例
问题描述
我尝试在 http://docs.python.org/dev 阅读文档/library/multiprocessing.html 但我仍在为多处理队列、池和锁定而苦苦挣扎.现在我能够构建下面的示例.
I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.html but I'm still struggling with multiprocessing Queue, Pool and Locking. And for now I was able to build the example below.
关于队列和池,我不确定我是否以正确的方式理解了这个概念,如果我错了,请纠正我.我想要实现的是一次处理 2 个请求(本例中数据列表有 8 个)那么,我应该使用什么?池来创建可以处理两个不同队列(最多 2 个)的 2 个进程,还是我应该每次只使用 Queue 来处理 2 个输入?锁定将正确打印输出.
Regarding Queue and Pool, I'm not sure if I understood the concept in the right way, so correct me if I'm wrong. What I'm trying to achieve is to process 2 requests at time ( data list have 8 in this example ) so, what should I use? Pool to create 2 processes that can handle two different queues ( 2 at max ) or should I just use Queue to process 2 inputs each time? The lock would be to print the outputs correctly.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s Waiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s DONE" % inputs
if __name__ == '__main__':
mp_handler(data)
解决方案
解决问题的最佳方法是利用 Pool
.使用 Queue
并拥有单独的队列馈送"功能可能是矫枉过正.
The best solution for your problem is to utilize a Pool
. Using Queue
s and having a separate "queue feeding" functionality is probably overkill.
这是您的程序的一个稍微重新排列的版本,这一次 只有 2 个进程 位于 Pool
中.我相信这是最简单的方法,对原始代码的改动很小:
Here's a slightly rearranged version of your program, this time with only 2 processes coralled in a Pool
. I believe it's the easiest way to go, with minimal changes to original code:
import multiprocessing
import time
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_worker((inputs, the_time)):
print " Processs %s Waiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s DONE" % inputs
def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)
if __name__ == '__main__':
mp_handler()
请注意,mp_worker()
函数现在接受一个参数(前两个参数的元组),因为 map()
函数将您的输入数据分块到子列表中,每个子列表作为一个参数提供给您的工作函数.
Note that mp_worker()
function now accepts a single argument (a tuple of the two previous arguments) because the map()
function chunks up your input data into sublists, each sublist given as a single argument to your worker function.
输出:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Processs c Waiting 6 seconds
Process b DONE
Processs d Waiting 8 seconds
Process c DONE
Processs e Waiting 1 seconds
Process e DONE
Processs f Waiting 3 seconds
Process d DONE
Processs g Waiting 5 seconds
Process f DONE
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
<小时>
根据下面的@Thales 评论进行
如果您想要为每个池限制锁定",以便您的进程以串联对运行,ala:
If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:
A 等待 B 等待 |A 完成,B 完成 |C等待,D等待|C 完成,D 完成 |...
A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...
然后将处理函数更改为为每对数据启动池(2 个进程):
then change the handler function to launch pools (of 2 processes) for each pair of data:
def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))
现在你的输出是:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Process b DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c DONE
Process d DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e DONE
Process f DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
相关文章