Python 多处理和共享计数器
问题描述
我在使用多处理模块时遇到了问题.我正在使用具有 map 方法的工作人员池从大量文件中加载数据,并为每个文件使用自定义函数分析数据.每次处理一个文件时,我都希望更新一个计数器,以便我可以跟踪还有多少文件需要处理.这是示例代码:
I'm having troubles with the multiprocessing module. I'm using a Pool of workers with its map method to load data from lots of files and for each of them I analyze data with with a custom function. Each time a file has been processed I would like to have a counter updated so that I can keep track of how many files remains to be processed. Here is sample code:
def analyze_data( args ):
# do something
counter += 1
print counter
if __name__ == '__main__':
list_of_files = os.listdir(some_directory)
global counter
counter = 0
p = Pool()
p.map(analyze_data, list_of_files)
我找不到解决办法.
解决方案
问题是 counter
变量在你的进程之间没有共享:每个单独的进程都在创建它自己的本地实例并递增它.
The problem is that the counter
variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.
请参阅文档的本节了解一些您可以用来在进程之间共享状态的技术.在您的情况下,您可能希望共享 Value
你的工人之间的实例
See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value
instance between your workers
这是您示例的工作版本(带有一些虚拟输入数据).请注意,它使用了我在实践中会尽量避免的全局值:
Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:
from multiprocessing import Pool, Value
from time import sleep
counter = None
def init(args):
''' store the counter for later use '''
global counter
counter = args
def analyze_data(args):
''' increment the global counter, do something with the input '''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10
if __name__ == '__main__':
#inputs = os.listdir(some_directory)
#
# initialize a cross-process counter and the input lists
#
counter = Value('i', 0)
inputs = [1, 2, 3, 4]
#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()
相关文章