Python中如何实现带超时的进程池

2023-04-11 00:00:00 进程 超时 如何实现

要实现带超时的进程池,需要使用Python中的multiprocessing模块和threading模块来实现。下面是详细的步骤:

  1. 定义进程池类:首先需要定义一个进程池类,用于管理多个进程的创建和执行。在该类中,需要定义进程池的大小,进程池的任务队列和执行结果队列。
from multiprocessing import Pool, Manager, TimeoutError
from threading import Thread

class TimeoutPool:
    def __init__(self, processes=1, timeout=10):
        self.processes = processes
        self.timeout = timeout
        self.pool = Pool(processes)
        self.manager = Manager()
        self.task_queue = self.manager.Queue()
        self.result_queue = self.manager.Queue()
  1. 定义进程池的工作方法:进程池的工作方法可以是任何需要执行的任务方法。在该方法中,需要将任务放入任务队列中,并等待执行结果队列中返回结果。
def worker(self, task):
    try:
        result = task()
        self.result_queue.put(result)
    except Exception as e:
        print('Worker failed: {}'.format(e))
  1. 定义进程池的执行方法:进程池的执行方法需要不断从任务队列中获取任务,并将其分配给进程池中的进程。在每个进程启动后,需要使用线程监视进程的执行时间,如果超时,则杀死进程并返回超时异常。
def run(self):
    while True:
        try:
            task = self.task_queue.get(timeout=1)
            p = self.pool.Process(target=self.worker, args=(task,))
            t = Thread(target=self.watchdog, args=(p,))
            t.daemon = True
            t.start()
            p.start()
            p.join()
        except Exception as e:
            print('Pool failed: {}'.format(e))
            break

def watchdog(self, process):
    timed_out = False
    timer = Thread(target=self.timing, args=(self.timeout, process))
    timer.start()
    while not timed_out and process.is_alive():
        timed_out = not timer.is_alive()
    if timed_out:
        print('Process timed out: {}'.format(process.pid))
        process.terminate()
        self.result_queue.put(TimeoutError())
    else:
        timer.join()

def timing(self, timeout, process):
    time.sleep(timeout)
  1. 定义进程池的接口方法:进程池的接口方法可以是任何需要执行的任务方法。在该方法中,需要将任务放入任务队列中,并等待执行结果队列中返回结果。
def apply_async(self, func, args=()):
    self.task_queue.put(lambda: func(*args))
    return self.result_queue.get()
  1. 测试超时进程池:使用字符串作为测试范例,将字符串转换为大写字母,并模拟任务执行时间超时的情况。
if __name__ == '__main__':
    pool = TimeoutPool(processes=4, timeout=3)

    def test_timeout():
        time.sleep(5)
        return 'Timeout'

    def test_success(string):
        time.sleep(1)
        return string.upper()

    results = [pool.apply_async(test_success, args=('pidancode.com',))]
    results += [pool.apply_async(test_timeout) for _ in range(3)]

    for result in results:
        try:
            print(result.get())
        except TimeoutError as e:
            print('Task timed out!')

参考资料:

  1. Python multiprocessing module documentation: https://docs.python.org/3/library/multiprocessing.html
  2. Python threading module documentation: https://docs.python.org/3/library/threading.html

相关文章