鼠兔,停止消费不起作用
问题描述
我是 rabbitmq 和 pika 的新手,并且无法停止消费.
I'm new to rabbitmq and pika, and is having trouble with stopping consuming.
频道和队列设置:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)
消费者和生产者基本上是这样的:
Basically, consumer and producer are like this:
消费者:
def task(task_id):
def callback(channel, method, properties, body):
if body != "quit":
print(body)
else:
print(body)
channel.stop_consuming(task_id)
channel.basic_consume(callback, queue=task_id, no_ack=True)
channel.start_consuming()
print("finish")
return "finish"
制作人:
proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None: # running
line = proc.stdout.readline()
if line:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body=line
)
else:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body="quit"
)
break
consumer task
给了我输出:
consumer task
gave me output:
# ... output from sample.sh, as expected
quit
�}q(UstatusqUSUCCESSqU tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.
但是,"finish"
没有被打印出来,所以我猜这是因为 channel.stop_sumption(task_id)
没有停止消费.如果是这样,正确的做法是什么?谢谢.
However, "finish"
didn't get printed, so I'm guessing it's because channel.stop_consuming(task_id)
didn't stop consuming. If so, what is the correct way to do? Thank you.
解决方案
我也遇到了同样的问题.这似乎是由于在内部,start_sumption
调用 self.connection.process_data_events(time_limit=None)
造成的.这个 time_limit=None
使它挂起.
I had the same problem. It seems to be caused by the fact that internally, start_consuming
calls self.connection.process_data_events(time_limit=None)
. This time_limit=None
makes it hang.
我设法通过将 channel.start_sumption()
的调用替换为它的实现来解决这个问题,被黑了:
I managed to workaround this problem by replacing the call to channel.start_consuming()
with its implemenation, hacked:
while channel._consumer_infos:
channel.connection.process_data_events(time_limit=1) # 1 second
相关文章