如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理

2022-01-11 00:00:00 python multiprocessing rabbitmq pika

问题描述

我在 python 中使用 pika 框架编写了非常基本的生产者-消费者代码.问题是 - 消费者端在队列中的消息上运行太慢.我进行了一些测试,发现我可以通过多处理将工作流程加快 27 倍.问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么.

I have very basic producer-consumer code written with pika framework in python. The problem is - consumer side runs too slow on messages in queue. I ran some tests and found out that i can speed up the workflow up to 27 times with multiprocessing. The problem is - I don't know what is the right way to add multiprocessing functionality to my code.

import pika
import json
from datetime import datetime
from functions import download_xmls


def callback(ch, method, properties, body):
    print('Got something')
    body = json.loads(body)
    type = body[-1]['Type']
    print('Object type in work currently ' + type)
    cnums = [x['cadnum'] for x in body[:-1]]
    print('Got {} cnums to work with'.format(len(cnums)))

    date_start = datetime.now()
    download_xmls(type,cnums)
    date_end = datetime.now()
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))


def consume(queue_name = 'bot-test'):
    parameters = pika.URLParameters('server@address')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='bot-test')
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

如何从这里开始添加多处理功能?

How do I start with adding multiprocessing functionality from here?


解决方案

Pika 有广泛的 示例代码,我建议您查看.请注意,此代码仅供 示例 使用.在处理线程的情况下,您将不得不使用更智能的方式来管理您的线程.

Pika has extensive example code that I recommend you check out. Note that this code is for example use only. In the case of doing work on threads, you will have to use a more intelligent way to manage your threads.

目标是不阻塞运行 Pika IO 循环的线程,并从您的工作线程正确回调到 IO 循环.这就是 add_callback_threadsafe 存在并在该代码中使用的原因.

The goal is to not block the thread that runs Pika's IO loop, and to call back into the IO loop correctly from your worker threads. That's why add_callback_threadsafe exists and is used in that code.

注意: RabbitMQ 团队监控 rabbitmq-users 邮件列表,并且有时只回答 StackOverflow 上的问题.

NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

相关文章