多处理 - 生产者/消费者设计

2022-01-12 00:00:00 python multiprocessing

问题描述

我正在使用多处理模块来拆分一个非常大的任务.它在大多数情况下都有效,但我的设计肯定遗漏了一些明显的东西,因为这样我很难有效地判断何时处理了所有数据.

I'm using the multiprocessing module to split up a very large task. It works for the most part, but I must be missing something obvious with my design, because this way it's very hard for me to effectively tell when all of the data has been processed.

我有两个单独的任务在运行;一个喂另一个.我想这是一个生产者/消费者问题.我在所有进程之间使用共享队列,生产者填满队列,消费者从队列中读取并进行处理.问题是数据量是有限的,所以在某些时候每个人都需要知道所有数据都已处理,以便系统可以正常关闭.

I have two separate tasks that run; one that feeds the other. I guess this is a producer/consumer problem. I use a shared Queue between all processes, where the producers fill up the queue, and the consumers read from the queue and do the processing. The problem is that there is a finite amount of data, so at some point everyone needs to know that all of the data has been processed so the system can shut down gracefully.

使用 map_async() 函数似乎很有意义,但由于生产者正在填满队列,我不知道前面的所有项目,所以我必须进入一个 while 循环并使用apply_async() 并尝试检测何时一切都完成了某种超时......丑陋.

It would seem to make sense to use the map_async() function, but since the producers are filling up the queue, I don't know all of the items up front, so I have to go into a while loop and use apply_async() and try to detect when everything is done with some sort of timeout...ugly.

我觉得我错过了一些明显的东西.如何设计得更好?

I feel like I'm missing something obvious. How can this be better designed?

制片人

class ProducerProcess(multiprocessing.Process):
    def __init__(self, item, consumer_queue):
        self.item = item
        self.consumer_queue = consumer_queue
        multiprocessing.Process.__init__(self)

    def run(self):
        for record in get_records_for_item(self.item): # this takes time
            self.consumer_queue.put(record)

def start_producer_processes(producer_queue, consumer_queue, max_running):
    running = []

    while not producer_queue.empty():
        running = [r for r in running if r.is_alive()]
        if len(running) < max_running:
            producer_item = producer_queue.get()
            p = ProducerProcess(producer_item, consumer_queue)
            p.start()
            running.append(p)
        time.sleep(1)

消费者

def process_consumer_chunk(queue, chunksize=10000):
    for i in xrange(0, chunksize):
        try:
            # don't wait too long for an item
            # if new records don't arrive in 10 seconds, process what you have
            # and let the next process pick up more items.

            record = queue.get(True, 10)
        except Queue.Empty:                
            break

        do_stuff_with_record(record)

主要

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    consumer_queue = manager.Queue(1024*1024)
    producer_queue = manager.Queue()

    producer_items = xrange(0,10)

    for item in producer_items:
        producer_queue.put(item)

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
    p.start()

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)

这就是它变得俗气的地方.我不能使用地图,因为要消费的列表同时被填满.所以我必须进入一个while循环并尝试检测超时.当生产者仍在尝试填充时,consumer_queue 可能会变为空,因此我不能只检测到空队列并退出.

Here is where it gets cheesy. I can't use map, because the list to consume is being filled up at the same time. So I have to go into a while loop and try to detect a timeout. The consumer_queue can become empty while the producers are still trying to fill it up, so I can't just detect an empty queue an quit on that.

    timed_out = False
    timeout= 1800
    while 1:
        try:
            result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
            if timed_out:
                timed_out = False

        except Queue.Empty:
            if timed_out:
                break

            timed_out = True
            time.sleep(timeout)
        time.sleep(1)

    consumer_queue.join()
    consumer_pool.close()
    consumer_pool.join()

我认为也许我可以在主线程中获取()记录并将它们传递给消费者而不是传递队列,但我认为我最终会遇到同样的问题.我仍然需要运行一个 while 循环并使用 apply_async() 提前感谢您的任何建议!

I thought that maybe I could get() the records in the main thread and pass those into the consumer instead of passing the queue in, but I think I end up with the same problem that way. I still have to run a while loop and use apply_async() Thank you in advance for any advice!


解决方案

您可以使用 manager.Event 来表示工作的结束.此事件可以在您的所有进程之间共享,然后当您从主进程发出信号时,其他工作人员可以正常关闭.

You could use a manager.Event to signal the end of the work. This event can be shared between all of your processes and then when you signal it from your main process the other workers can then gracefully shutdown.

while not event.is_set():
 ...rest of code...

因此,您的消费者将等待事件设置并在设置后处理清理.

So, your consumers would wait for the event to be set and handle the cleanup once it is set.

要确定何时设置此标志,您可以在生产者线程上执行 join,当这些都完成后,您可以在消费者线程上加入.

To determine when to set this flag you can do a join on the producer threads and when those are all complete you can then join on the consumer threads.

相关文章