Python 和 RabbitMQ - 聆听来自多个渠道的消费事件的最佳方式?

2022-01-11 00:00:00 python rabbitmq pika

问题描述

我有两个独立的 RabbitMQ 实例.我正在尝试找到聆听两者事件的最佳方式.

I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.

例如,我可以通过以下方式消费其中的事件:

For example, I can consume events on one with the following:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

我还有第二个主机,host2",我也想听.我考虑过创建两个单独的线程来执行此操作,但根据我的阅读,pika 不是线程安全的.有没有更好的办法?或者创建两个单独的线程,每个线程都监听不同的 Rabbit 实例(host1 和 host2)就足够了?

I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?


解决方案

什么是最好的方式"的答案在很大程度上取决于您对队列的使用模式以及最好"的含义.由于我还不能对问题发表评论,所以我将尝试提出一些可能的解决方案.

The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.

在每个示例中,我将假设 exchange 已经声明.

In each example I'm going to assume exchange is already declared.

您可以使用 pika.

You can consume messages from two queues on separate hosts in single process using pika.

你是对的 - 它自己的常见问题解答状态,pika 不是线程安全的,但它可以通过为每个线程创建到 RabbitMQ 主机的连接以多线程方式使用.使用 threading 模块使这个示例在线程中运行看起来如下:

You are right - as its own FAQ states, pika is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading module looks as follows:

import pika
import threading


class ConsumerThread(threading.Thread):
    def __init__(self, host, *args, **kwargs):
        super(ConsumerThread, self).__init__(*args, **kwargs)

        self._host = host

    # Not necessarily a method.
    def callback_func(self, channel, method, properties, body):
        print("{} received '{}'".format(self.name, body))

    def run(self):
        credentials = pika.PlainCredentials("guest", "guest")

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self._host,
                                      credentials=credentials))

        channel = connection.channel()

        result = channel.queue_declare(exclusive=True)

        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")

        channel.basic_consume(self.callback_func,
                              result.method.queue,
                              no_ack=True)

        channel.start_consuming()


if __name__ == "__main__":
    threads = [ConsumerThread("host1"), ConsumerThread("host2")]
    for thread in threads:
        thread.start()

我已将 callback_func 声明为纯粹用于在打印消息正文时使用 ConsumerThread.name 的方法.它也可能是 ConsumerThread 类之外的一个函数.

I've declared callback_func as a method purely to use ConsumerThread.name while printing message body. It might as well be a function outside the ConsumerThread class.

或者,您总是可以只运行一个带有消费者代码的进程,每个队列要消费事件.

Alternatively, you can always just run one process with consumer code per queue you want to consume events.

import pika
import sys


def callback_func(channel, method, properties, body):
    print(body)


if __name__ == "__main__":
    credentials = pika.PlainCredentials("guest", "guest")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=sys.argv[1],
                                  credentials=credentials))

    channel = connection.channel()

    result = channel.queue_declare(exclusive=True)

    channel.queue_bind(result.method.queue,
                       exchange="my-exchange",
                       routing_key="*.*.*.*.*")

    channel.basic_consume(callback_func, result.method.queue, no_ack=True)

    channel.start_consuming()

然后运行:

$ python single_consume.py host1
$ python single_consume.py host2  # e.g. on another console

如果您对来自队列的消息所做的工作是 CPU-heavy 并且只要您的 CPU 中的内核数量 >= 消费者数量,通常最好使用这种方法 - 除非您的队列大部分时间都是空的,并且消费者不会使用此 CPU 时间*.

If the work you're doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.

另一种选择是涉及一些异步框架(例如 Twisted)并运行整个单线程中的东西.

Another alternative is to involve some asynchronous framework (for example Twisted) and running whole thing in single thread.

你不能再在异步代码中使用BlockingConnection;幸运的是,pikaTwisted 的适配器:

You can no longer use BlockingConnection in asynchronous code; fortunately, pika has adapter for Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
    def on_connected(self, connection):
        d = connection.channel()
        d.addCallback(self.got_channel)
        d.addCallback(self.queue_declared)
        d.addCallback(self.queue_bound)
        d.addCallback(self.handle_deliveries)
        d.addErrback(log.err)

    def got_channel(self, channel):
        self.channel = channel

        return self.channel.queue_declare(exclusive=True)

    def queue_declared(self, queue):
        self._queue_name = queue.method.queue

        self.channel.queue_bind(queue=self._queue_name,
                                exchange="my-exchange",
                                routing_key="*.*.*.*.*")

    def queue_bound(self, ignored):
        return self.channel.basic_consume(queue=self._queue_name)

    def handle_deliveries(self, queue_and_consumer_tag):
        queue, consumer_tag = queue_and_consumer_tag
        self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

        return self.looping_call.start(0)

    def consume_from_queue(self, queue):
        d = queue.get()

        return d.addCallback(lambda result: self.handle_payload(*result))

    def handle_payload(self, channel, method, properties, body):
        print(body)


if __name__ == "__main__":
    consumer1 = Consumer()
    consumer2 = Consumer()

    parameters = ConnectionParameters()
    cc = protocol.ClientCreator(reactor,
                                TwistedProtocolConnection,
                                parameters)
    d1 = cc.connectTCP("host1", 5672)
    d1.addCallback(lambda protocol: protocol.ready)
    d1.addCallback(consumer1.on_connected)
    d1.addErrback(log.err)

    d2 = cc.connectTCP("host2", 5672)
    d2.addCallback(lambda protocol: protocol.ready)
    d2.addCallback(consumer2.on_connected)
    d2.addErrback(log.err)

    reactor.run()

这种方法会更好,您将使用的队列越多,消费者执行的工作对 CPU 的限制就越少*.

This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.

由于您提到了 pika,我将自己限制为基于 Python 2.x 的解决方案,因为尚未移植 pika.

Since you've mentioned pika, I've restricted myself to Python 2.x-based solutions, because pika is not yet ported.

但如果您想迁移到 >=3.3,一种可能的选择是使用 asyncio 使用 AMQP 协议之一(您使用 RabbitMQ 使用的协议),例如asynqpaioamqp.

But in case you would want to move to >=3.3, one possible option is to use asyncio with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp or aioamqp.

* - 请注意,这些都是非常肤浅的提示 - 在大多数情况下,选择并不那么明显;什么对您最有利取决于队列饱和度"(消息/时间)、收到这些消息后您做了什么工作、您在什么环境中运行您的消费者等;除了对所有实现进行基准测试之外,没有其他方法可以确定

* - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations

相关文章