Python 和 RabbitMQ - 聆听来自多个渠道的消费事件的最佳方式?
问题描述
我有两个独立的 RabbitMQ 实例.我正在尝试找到聆听两者事件的最佳方式.
I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.
例如,我可以通过以下方式消费其中的事件:
For example, I can consume events on one with the following:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
我还有第二个主机,host2",我也想听.我考虑过创建两个单独的线程来执行此操作,但根据我的阅读,pika 不是线程安全的.有没有更好的办法?或者创建两个单独的线程,每个线程都监听不同的 Rabbit 实例(host1 和 host2)就足够了?
I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?
解决方案
什么是最好的方式"的答案在很大程度上取决于您对队列的使用模式以及最好"的含义.由于我还不能对问题发表评论,所以我将尝试提出一些可能的解决方案.
The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.
在每个示例中,我将假设 exchange 已经声明.
In each example I'm going to assume exchange is already declared.
您可以使用 pika
.
You can consume messages from two queues on separate hosts in single process using pika
.
你是对的 - 它自己的常见问题解答状态,pika
不是线程安全的,但它可以通过为每个线程创建到 RabbitMQ 主机的连接以多线程方式使用.使用 threading
模块使这个示例在线程中运行看起来如下:
You are right - as its own FAQ states, pika
is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading
module looks as follows:
import pika
import threading
class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)
self._host = host
# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))
def run(self):
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()
我已将 callback_func
声明为纯粹用于在打印消息正文时使用 ConsumerThread.name
的方法.它也可能是 ConsumerThread
类之外的一个函数.
I've declared callback_func
as a method purely to use ConsumerThread.name
while printing message body. It might as well be a function outside the ConsumerThread
class.
或者,您总是可以只运行一个带有消费者代码的进程,每个队列要消费事件.
Alternatively, you can always just run one process with consumer code per queue you want to consume events.
import pika
import sys
def callback_func(channel, method, properties, body):
print(body)
if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
channel.start_consuming()
然后运行:
$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console
如果您对来自队列的消息所做的工作是 CPU-heavy 并且只要您的 CPU 中的内核数量 >= 消费者数量,通常最好使用这种方法 - 除非您的队列大部分时间都是空的,并且消费者不会使用此 CPU 时间*.
If the work you're doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.
另一种选择是涉及一些异步框架(例如 Twisted
)并运行整个单线程中的东西.
Another alternative is to involve some asynchronous framework (for example Twisted
) and running whole thing in single thread.
你不能再在异步代码中使用BlockingConnection
;幸运的是,pika
有 Twisted
的适配器:
You can no longer use BlockingConnection
in asynchronous code; fortunately, pika
has adapter for Twisted
:
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log
class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
return self.channel.queue_declare(exclusive=True)
def queue_declared(self, queue):
self._queue_name = queue.method.queue
self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()
parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)
d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)
reactor.run()
这种方法会更好,您将使用的队列越多,消费者执行的工作对 CPU 的限制就越少*.
This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.
由于您提到了 pika
,我将自己限制为基于 Python 2.x 的解决方案,因为尚未移植 pika
.
Since you've mentioned pika
, I've restricted myself to Python 2.x-based solutions, because pika
is not yet ported.
但如果您想迁移到 >=3.3,一种可能的选择是使用 asyncio
使用 AMQP 协议之一(您使用 RabbitMQ 使用的协议),例如asynqp
或 aioamqp
.
But in case you would want to move to >=3.3, one possible option is to use asyncio
with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp
or aioamqp
.
* - 请注意,这些都是非常肤浅的提示 - 在大多数情况下,选择并不那么明显;什么对您最有利取决于队列饱和度"(消息/时间)、收到这些消息后您做了什么工作、您在什么环境中运行您的消费者等;除了对所有实现进行基准测试之外,没有其他方法可以确定
* - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations
相关文章