与 multiprocessing.Pool 共享一个计数器
问题描述
我想使用 multiprocessing.Value
+ multiprocessing.Lock
在不同的进程之间共享一个计数器.例如:
I'd like to use multiprocessing.Value
+ multiprocessing.Lock
to share a counter between separate processes. For example:
import itertools as it
import multiprocessing
def func(x, val, lock):
for i in range(x):
i ** 2
with lock:
val.value += 1
print('counter incremented to:', val.value)
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
with multiprocessing.Pool() as pool:
pool.starmap(func, ((i, v, lock) for i in range(25)))
print(counter.value())
这将引发以下异常:
RuntimeError: 同步对象只能在之间共享通过继承处理
RuntimeError: Synchronized objects should only be shared between processes through inheritance
我最困惑的是一个相关的(尽管不是完全相似的)模式与 multiprocessing.Process()
一起工作:
What I am most confused by is that a related (albeit not completely analogous) pattern works with multiprocessing.Process()
:
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
procs = [multiprocessing.Process(target=func, args=(i, v, lock))
for i in range(25)]
for p in procs: p.start()
for p in procs: p.join()
现在,我认识到这是两个明显不同的东西:
Now, I recognize that these are two different markedly things:
- 第一个示例使用数量等于
cpu_count()
的工作进程,并在它们之间拆分一个可迭代的range(25)
- 第二个示例创建了 25 个工作进程和任务,每个都有一个输入
- the first example uses a number of worker processes equal to
cpu_count()
, and splits an iterablerange(25)
between them - the second example creates 25 worker processes and tasks each with one input
也就是说:如何以这种方式与 pool.starmap()
(或 pool.map()
)共享实例?
That said: how can I share an instance with pool.starmap()
(or pool.map()
) in this manner?
我在这里看到了类似的问题,这里和这里,但这些方法似乎不适合.map()
/.starmap()
,不管Value
是否使用ctypes.c_int
.
I've seen similar questions here, here, and here, but those approaches doesn't seem to be suited to .map()
/.starmap()
, regarldess of whether Value
uses ctypes.c_int
.
我意识到这种方法在技术上是可行的:
I realize that this approach technically works:
def func(x):
for i in range(x):
i ** 2
with lock:
v.value += 1
print('counter incremented to:', v.value)
v = None
lock = None
def set_global_counter_and_lock():
"""Egh ... """
global v, lock
if not any((v, lock)):
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
if __name__ == '__main__':
# Each worker process will call `initializer()` when it starts.
with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
pool.map(func, range(25))
这真的是解决这个问题的最佳实践方式吗?
Is this really the best-practices way of going about this?
解决方案
你在使用 Pool
时得到的 RuntimeError
是因为 pool-methods 的参数在发送之前被腌制了通过(池内部)队列到工作进程.您尝试使用哪种池方法在这里无关紧要.当您只使用 Process
时不会发生这种情况,因为不涉及队列.您可以使用 pickle.dumps(multiprocessing.Value('i', 0))
重现错误.
The RuntimeError
you get when using Pool
is because arguments for pool-methods are pickled before being send over a (pool-internal) queue to the worker processes.
Which pool-method you are trying to use is irrelevant here. This doesn't happen when you just use Process
because there is no queue involved. You can reproduce the error just with pickle.dumps(multiprocessing.Value('i', 0))
.
您的最后一个代码段与您认为的不同.您不是在共享 Value
,而是在为每个子进程重新创建独立的计数器.
Your last code snippet doesn't work how you think it works. You are not sharing a Value
, you are recreating independent counters for every child process.
如果您在 Unix 上并使用默认的启动方法fork",则只需 不 将共享对象作为参数传递给池方法即可.您的子进程将通过分叉继承全局变量.使用 process-start-methods "spawn"(默认 Windows 和 macOS 和 Python 3.8+) 或forkserver",您必须在 Pool
期间使用 initializer
实例化,让子进程继承共享对象.
In case you were on Unix and used the default start-method "fork", you would be done with just not passing the shared objects as arguments into the pool-methods.
Your child-processes would inherit the globals through forking. With process-start-methods "spawn" (default Windows and macOS with Python 3.8+) or "forkserver", you'll have to use the initializer
during Pool
instantiation, to let the child-processes inherit the shared objects.
注意,这里不需要额外的 multiprocessing.Lock
,因为 multiprocessing.Value
默认带有一个您可以使用的内部锁.
Note, you don't need an extra multiprocessing.Lock
here, because multiprocessing.Value
comes by default with an internal one you can use.
import os
from multiprocessing import Pool, Value #, set_start_method
def func(x):
for i in range(x):
assert i == i
with cnt.get_lock():
cnt.value += 1
print(f'{os.getpid()} | counter incremented to: {cnt.value}
')
def init_globals(counter):
global cnt
cnt = counter
if __name__ == '__main__':
# set_start_method('spawn')
cnt = Value('i', 0)
iterable = [10000 for _ in range(10)]
with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
pool.map(func, iterable)
assert cnt.value == 100000
同样值得注意的是,您不需要在所有情况下都共享计数器.如果您只需要跟踪某事总共发生的频率,一个选项是在计算过程中保留单独的工作本地计数器,最后进行汇总.对于在并行计算期间不需要同步的频繁计数器更新,这可能会显着提高性能.
Probably worth noting as well is that you don't need the counter to be shared in all cases. If you just need to keep track of how often something has happened in total, an option would be to keep separate worker-local counters during computation which you sum up at the end. This could result in a significant performance improvement for frequent counter updates for which you don't need synchronization during the parallel computation itself.
相关文章