如何在Python中用多进程.pool创建全局锁/信号量?

2022-04-10 00:00:00 python python-multiprocessing

问题描述

我希望限制子进程中的资源访问。例如-限制http下载、磁盘io等。如何扩展此基本代码才能实现它?

请分享一些基本代码示例。

pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
  for job in job_queue.pull_jobs_for_processing:
    pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()

解决方案

创建池时使用初始值设定项和初始化参数,以便在所有子进程中定义全局。

例如:

from multiprocessing import Pool, Lock
from time import sleep

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def init_child(lock_):
    global lock
    lock = lock_

def main():
    lock = Lock()
    poolsize = 4
    with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

此代码将按升序(提交作业的顺序)打印数字0-3,因为它使用锁。注释掉with lock:行,可以看到它以降序打印出数字。

此解决方案可在Windows和Unix上运行。然而,因为进程可以在Unix系统上派生,所以Unix只需要在模块范围内声明全局变量。子进程获取父进程内存的副本,其中包括仍在工作的lock对象。因此,不一定需要初始化器,但它可以帮助记录代码的工作方式。如果多处理能够通过派生创建进程,则以下方法也适用。

from multiprocessing import Pool, Lock
from time import sleep

lock = Lock()

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def main():
    poolsize = 4
    with Pool(poolsize) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

相关文章