python pool apply_async 和 map_async 不会阻塞完整队列
问题描述
我对 python 还很陌生.我正在使用多处理模块读取标准输入上的文本行,以某种方式转换它们并将它们写入数据库.这是我的代码片段:
I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
现在一切正常,直到我开始处理巨大的输入文件(数亿行),然后通过管道传输到我的 python 程序中.在某些时候,当我的数据库变慢时,我会看到内存已满.
Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.
玩了一会儿,发现 pool.apply_async 和 pool.map_async 从来没有阻塞过,所以要处理的调用队列越来越大.
After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.
解决我的问题的正确方法是什么?我希望我可以设置一个参数,一旦达到某个队列长度,它将阻止 pool.apply_async 调用.Java 中的 AFAIR 可以为此目的为 ThreadPoolExecutor 提供一个具有固定长度的 BlockingQueue.
What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.
谢谢!
解决方案
apply_async
和 map_async
函数旨在不阻塞主进程.为了做到这一点,Pool
维护了一个内部 Queue
,遗憾的是它的大小无法更改.
The apply_async
and map_async
functions are designed not to block the main process. In order to do so, the Pool
maintains an internal Queue
which size is unfortunately impossible to change.
解决问题的方法是使用 Semaphore
以您希望队列的大小进行初始化.在为池提供数据之前以及在工作人员完成任务之后获取和释放信号量.
The way the problem can be solved is by using a Semaphore
initialized with the size you want the queue to be. You acquire and release the semaphore before feeding the pool and after a worker has completed the task.
这是一个使用 Python 2.6 或更高版本的示例.
Here's an example working with Python 2.6 or greater.
from threading import Semaphore
from multiprocessing import Pool
def task_wrapper(f):
"""Python2 does not allow a callback for method raising exceptions,
this wrapper ensures the code run into the worker will be exception free.
"""
try:
return f()
except:
return None
class TaskManager(object):
def __init__(self, processes, queue_size):
self.pool = Pool(processes=processes)
self.workers = Semaphore(processes + queue_size)
def new_task(self, f):
"""Start a new task, blocks if queue is full."""
self.workers.acquire()
self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done))
def task_done(self):
"""Called once task is done, releases the queue is blocked."""
self.workers.release()
另一个使用 concurrent.futures
池实现的示例.
Another example using concurrent.futures
pools implementation.
相关文章