具有池/队列的Python多个子进程在一个完成后立即恢复输出并在队列中启动下一个作业

问题描述

我目前正在启动一个子进程并随时随地解析标准输出,而无需等待它完成解析标准输出.

I'm currently launching a subprocess and parsing stdout on the go without waiting for it to finish to parse stdout.

for sample in all_samples:
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..

在我的脚本中,我多次执行此操作,实际上取决于输入样本的数量.

In my script I perform this action multiple times, indeed depending on the number of input samples.

这里的主要问题是每个子进程都是一个程序/工具,它在运行时 100% 使用 1 个 CPU.而且它需要一些时间......每个输入可能需要 20-40 分钟.

Main problem here is that every subprocess is a program/tool that uses 1 CPU for 100% while it's running. And it takes sometime.. maybe 20-40 min per input.

我想要实现的是设置同时运行的最大 N 个子进程作业进程的池、队列(我不确定这里的确切术语是什么).所以我可以最大限度地提高性能,而不是按顺序进行.

What I would like to achieve, is to set a pool, queue (I'm not sure what's the exact terminology here) of N max subprocess job process running at same time. So I could maximize performance, and not proceed sequentially.

因此,一个执行流程(例如最多 4 个作业池)应该是:

So an execution flow for example a max 4 jobs pool should be:

  • 启动 4 个子流程.
  • 当其中一项作业完成后,解析标准输出并启动下一步.
  • 执行此操作,直到队列中的所有作业都完成.

如果我能做到这一点,我真的不知道如何确定哪个示例子流程已经完成.此时,我不需要识别它们,因为每个子进程都是按顺序运行的,并且我将 stdout 解析为子进程正在打印 stdout.

If I can achieve this I really don't know how I could identify which sample subprocess is the one that has finished. At this moment, I don't need to identify them since each subprocess runs sequentially and I parse stdout as subprocess is printing stdout.

这非常重要,因为我需要识别每个子流程的输出并将其分配给它对应的输入/样本.

This is really important, since I need to identify the output of each subprocess and assign it to it's corresponding input/sample.


解决方案

ThreadPool 可能很适合您的问题,您设置工作线程的数量并添加作业,线程将工作完成所有任务.

ThreadPool could be a good fit for your problem, you set the number of worker threads and add jobs, and the threads will work their way through all the tasks.

from multiprocessing.pool import ThreadPool
import subprocess


def work(sample):
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..


num = None  # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
for sample in all_samples:
    tp.apply_async(work, (sample,))

tp.close()
tp.join()

相关文章