Python中如何实现基于队列的并行任务处理

2023-04-11 00:00:00 并行 队列 如何实现

Python中可以使用多进程或多线程方式实现基于队列的并行任务处理。下面分别介绍这两种方式。

  1. 多进程方式实现基于队列的并行任务处理

下面是一个简单的多进程代码示例,用于实现基于队列的并行任务处理:

import multiprocessing as mp
import queue

# 定义任务函数
def task_func(queue):
    while True:
        task = queue.get()  # 从队列中获取任务
        if task is None:  # 如果队列中没有任务了,就退出循环
            break
        print("Task start: {}".format(task))
        # 这里可以放具体的任务代码,比如对 task 进行处理、调用其他函数等
        print("Task end: {}".format(task))

if __name__ == "__main__":
    # 创建进程池,数量为 CPU 核心数
    pool = mp.Pool(mp.cpu_count())
    # 创建队列
    q = mp.Manager().Queue()
    # 将任务放入队列中
    tasks = ["pidancode.com", "皮蛋编程"]
    for task in tasks:
        q.put(task)
    # 启动进程进行任务处理
    for i in range(mp.cpu_count()):
        pool.apply_async(task_func, args=(q,))
    # 等待所有进程执行完毕
    pool.close()
    pool.join()

上述代码中,首先定义了任务函数 task_func,该函数接收一个队列对象作为参数,在函数内部不断地从队列中获取任务,并进行任务处理。当队列为空时,函数退出。

在主程序中,首先创建了进程池,数量为 CPU 核心数,然后创建了一个队列,并将任务放入队列中。接下来启动多个进程,每个进程执行 task_func 函数,共同完成任务处理。最后等待所有进程执行完毕后,程序结束。

  1. 多线程方式实现基于队列的并行任务处理

下面是一个简单的多线程代码示例,用于实现基于队列的并行任务处理:

import queue
import threading

# 定义任务函数
def task_func(queue):
    while True:
        task = queue.get()  # 从队列中获取任务
        if task is None:  # 如果队列中没有任务了,就退出循环
            break
        print("Task start: {}".format(task))
        # 这里可以放具体的任务代码,比如对 task 进行处理、调用其他函数等
        print("Task end: {}".format(task))

if __name__ == "__main__":
    # 创建队列
    q = queue.Queue()
    # 将任务放入队列中
    tasks = ["pidancode.com", "皮蛋编程"]
    for task in tasks:
        q.put(task)
    # 创建多个线程进行任务处理
    threads = []
    for i in range(threading.active_count()):
        t = threading.Thread(target=task_func, args=(q,))
        t.start()
        threads.append(t)
    # 等待所有线程执行完毕
    for t in threads:
        t.join()

上述代码中,首先定义了任务函数 task_func,该函数接收一个队列对象作为参数,在函数内部不断地从队列中获取任务,并进行任务处理。当队列为空时,函数退出。

在主程序中,首先创建了一个队列,并将任务放入队列中。然后创建多个线程,每个线程执行 task_func 函数,共同完成任务处理。最后等待所有线程执行完毕后,程序结束。

相关文章