Python中如何实现基于队列的分布式消息传递

2023-04-11 00:00:00 分布式 队列 如何实现
  1. 首先需要安装Python的消息队列库——RabbitMQ,这个库可以在Python中使用pika模块来访问和使用。
pip install pika
  1. 创建一个队列:
import pika

# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello')

# 发送消息到队列中
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

# 关闭连接
connection.close()
  1. 消费者获取消息:
import pika

# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello')

# 回调函数来处理消息
def callback(ch, method, properties, body):
    print("Received %r" % body)

# 告诉RabbitMQ将我们的callback function接收到名为hello的队列中
channel.basic_consume(callback, queue='hello', no_ack=True)

print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  1. 生产者和消费者以及他们之间的交互可以分别在不同的进程中运行,通过queue(队列)来进行对消息的控制。比如,生产者在队列中添加一些消息,消费者把这些消息取出来并执行相应的操作。
#生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

#消费者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

相关文章