如何从多个进程递增共享计数器?

2022-02-23 00:00:00 python multiprocessing

问题描述

我在使用multiprocessing模块时遇到问题。我使用Pool个工作者及其map方法并发分析大量文件。每次处理文件时,我都希望更新计数器,这样我就可以跟踪还有多少文件需要处理。以下是示例代码:

import os
import multiprocessing

counter = 0


def analyze(file):
    # Analyze the file.
    global counter
    counter += 1
    print counter


if __name__ == '__main__':
    files = os.listdir('/some/directory')
    pool = multiprocessing.Pool(4)
    pool.map(analyze, files)

我找不到此问题的解决方案。


解决方案

问题在于counter变量不在您的进程之间共享:每个单独的进程都在创建自己的本地实例并递增该实例。

有关可用于在进程之间共享状态的一些技术,请参阅文档的this section。在您的情况下,您可能希望在您的员工之间共享Value实例

这里是您的示例的工作版本(带有一些虚拟输入数据)。请注意,它使用的是我在实践中确实会尽量避免的全局值:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()

相关文章