Python中如何实现基于队列的分布式消息传递
- 首先需要安装Python的消息队列库——RabbitMQ,这个库可以在Python中使用pika模块来访问和使用。
pip install pika
- 创建一个队列:
import pika # 建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 创建队列 channel.queue_declare(queue='hello') # 发送消息到队列中 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') # 关闭连接 connection.close()
- 消费者获取消息:
import pika # 建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 创建队列 channel.queue_declare(queue='hello') # 回调函数来处理消息 def callback(ch, method, properties, body): print("Received %r" % body) # 告诉RabbitMQ将我们的callback function接收到名为hello的队列中 channel.basic_consume(callback, queue='hello', no_ack=True) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming()
- 生产者和消费者以及他们之间的交互可以分别在不同的进程中运行,通过queue(队列)来进行对消息的控制。比如,生产者在队列中添加一些消息,消费者把这些消息取出来并执行相应的操作。
#生产者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close() #消费者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
相关文章