Python 在进程之间共享锁
问题描述
我正在尝试使用部分函数,以便 pool.map() 可以针对具有多个参数的函数(在本例中为 Lock() 对象).
I am attempting to use a partial function so that pool.map() can target a function that has more than one parameter (in this case a Lock() object).
这里是示例代码(取自我上一个问题的答案):
Here is example code (taken from an answer to a previous question of mine):
from functools import partial
def target(lock, iterable_item):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()
但是当我运行这段代码时,我得到了错误:
However when I run this code, I get the error:
Runtime Error: Lock objects should only be shared between processes through inheritance.
我在这里缺少什么?如何在我的子进程之间共享锁?
What am I missing here? How can I share the lock between my subprocesses?
解决方案
你不能将普通的 multiprocessing.Lock
对象传递给 Pool
方法,因为它们不能被腌制.有两种方法可以解决这个问题.一种是创建 Manager()
和传递一个 Manager.Lock()
:
You can't pass normal multiprocessing.Lock
objects to Pool
methods, because they can't be pickled. There are two ways to get around this. One is to create Manager()
and pass a Manager.Lock()
:
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
m = multiprocessing.Manager()
l = m.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()
不过,这有点重量级;使用 Manager
需要生成另一个进程来托管 Manager
服务器.并且所有对 acquire
/release
锁的调用都必须通过 IPC 发送到该服务器.
This is a little bit heavyweight, though; using a Manager
requires spawning another process to host the Manager
server. And all calls to acquire
/release
the lock have to be sent to that server via IPC.
另一个选项是在创建池时使用 initializer
kwarg 传递常规的 multiprocessing.Lock()
.这将使您的锁实例在所有子工作者中都是全局的:
The other option is to pass the regular multiprocessing.Lock()
at Pool creation time, using the initializer
kwarg. This will make your lock instance global in all the child workers:
def target(iterable_item):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def init(l):
global lock
lock = l
def main():
iterable = [1, 2, 3, 4, 5]
l = multiprocessing.Lock()
pool = multiprocessing.Pool(initializer=init, initargs=(l,))
pool.map(target, iterable)
pool.close()
pool.join()
第二种解决方案的副作用是不再需要partial
.
The second solution has the side-effect of no longer requiring partial
.
相关文章