python multiprocessing apply_async 只使用一个进程
问题描述
我有一个脚本,其中包括从列表中打开一个文件,然后对该文件中的文本执行某些操作.我正在使用 python 多处理和 Pool 来尝试并行化此操作.脚本的抽象如下:
I have a script that includes opening a file from a list and then doing something to the text within that file. I'm using python multiprocessing and Pool to try to parallelize this operation. A abstraction of the script is below:
import os
from multiprocessing import Pool
results = []
def testFunc(files):
for file in files:
print "Working in Process #%d" % (os.getpid())
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
if __name__=="__main__":
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.apply_async(testFunc, args = (files,))
results2 = results.get()
当我运行此程序时,每次迭代的进程 ID 打印输出都是相同的.基本上我要做的是获取输入列表的每个元素并将其分叉到一个单独的进程中,但似乎一个进程正在完成所有工作.
When I run this the print out of the process id is the same for each iteration. Basically what I'm trying to do is take each element of the input list and fork it out to a separate process, but it seems like one process is doing all of the work.
解决方案
apply_async
将一项任务分配给池.你需要打电话apply_async
多次以锻炼更多处理器.- 不要让两个进程都尝试写入同一个列表,
结果
.由于池工作者是独立的进程,这两个不会写入同一个列表.解决此问题的一种方法是使用输出队列.您可以自己设置,或使用apply_async
的回调为您设置队列.apply_async
将在函数完成后调用回调. - 你可以使用
map_async
代替apply_async
,但是你会获取列表列表,然后您必须将其展平. apply_async
farms out one task to the pool. You would need to callapply_async
many times to exercise more processors.- Don't allow both processes to try to write to the same list,
results
. Since the pool workers are separate processes, the two won't be writing to the same list. One way to work around this is to use an ouput Queue. You could set it up yourself, or useapply_async
's callback to setup the Queue for you.apply_async
will call the callback once the function completes. - You could use
map_async
instead ofapply_async
, but then you'd get a list of lists, which you'd then have to flatten.
所以,不妨试试类似的方法:
So, perhaps try instead something like:
import os
import multiprocessing as mp
results = []
def testFunc(file):
result = []
print "Working in Process #%d" % (os.getpid())
# This is just an illustration of some logic. This is not what I'm
# actually doing.
with open(file, 'r') as f:
for line in f:
if 'dog' in line:
result.append(line)
return result
def collect_results(result):
results.extend(result)
if __name__ == "__main__":
p = mp.Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
for f in files:
p.apply_async(testFunc, args=(f, ), callback=collect_results)
p.close()
p.join()
print(results)
相关文章