启动 celery worker 并为广播队列启用它
问题描述
我正在尝试启动 celery worker,所以它只听单个队列.这不是问题,我可以这样做:
I'm trying to start celery worker so it only listens to single queue. This is not a problem, I can do this that way:
python -m celery worker -A my_module -Q my_queue -c 1
但现在我也希望这个 my_queue
队列成为广播队列,所以我在 celeryconfig 中这样做:
But now I also want this my_queue
queue to be a broadcast queue, so I do this in my celeryconfig:
from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)
但是一旦我这样做了,我就不能再启动我的工人了,我从 rabbitmq 收到错误:
But as soon as I do this I cannot start my worker anymore, I get error from rabbitmq:
amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'
如果我在没有 -Q
的情况下启动 worker(但如上所述将 Broadcast
留在 celeryconfig.py
中)并且我可以列出 rabbitmq 队列看到广播队列的创建和命名如下:
If I start worker without -Q
(but leaving Broadcast
in celeryconfig.py
as described above) and I list rabbitmq queues I can see broadcast queue is created and named like this:
bcast.43fecba7-786a-461c-a322-620039b29b8b
同样,如果我在 worker 中定义这个队列(使用上面提到的 -Q
)或在 celeryconfig.py
中作为简单的 Queue
这个:
And similarly if I define this queue within worker (using -Q
as mentioned above) or as simple Queue
in celeryconfig.py
like this:
from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)
我可以像这样在 rabbitmq 中看到这个队列:
I can see this queue in rabbitmq like this:
my_queue
在定义队列时,我在 Broadcast
调用中添加了什么并不重要 - 这似乎是内部 celery 名称,而不是传递给 rabbitmq.
It apperas it does not matter what I put into Broadcast
call when defining the queue - this seems to be internal celery name, not passed to rabbitmq.
所以我猜当工人开始时,然后 my_queue
被创建,一旦完成它就不能 Broadcast
.
So I'm guessing when worker is starting then my_queue
is created and once that's done it cannot be made Broadcast
.
我可以让一个工作人员监听任何队列(不仅是 my_queue
),我将从删除 -Q
参数开始.但是如果我能有一个进程只监听那个特定的队列,那就太好了,因为我在那里投入的任务很快,而且我想尽可能地降低延迟.
I can have a worker that listens to any queue (not only to my_queue
) which I would start by removing the -Q
argument. But it would be nice to be able to have a single process that only listens to that particular queue since my tasks I throw in there are fast and I'd like to bring latency down as much as possible.
--- 编辑 1 ---花了一些时间解决这个问题,上面提到的 bcast
队列似乎并不一致.重置 rabbitmq 并在没有 -Q
选项的情况下运行 celery 后,bcast
队列没有出现...
--- Edit 1 ---
Spent some time with this problem and it seems bcast
queue mentioned above does not appear consistently. After reseting rabbitmq and running celery without -Q
option bcast
queue did not appear...
解决方案
当使用代理发送消息时,客户端和工作人员必须就相同的配置值达成一致.如果您必须更改配置,则需要清除现有消息并重新启动所有内容以使它们同步.
When using a broker for sending messages, client and workers must agree on same configuration values. If you have to change config, you need to purge existing messages and restart everything so that they are in sync.
启动广播队列时,您可以设置交换类型并配置队列.
When starting a broadcast queue you can set exchange type and configure the queue.
from kombu.common import Broadcast
from kombu import Exchange
exchange = Exchange('custom_exchange', type='fanout')
CELERY_QUEUES = (
Broadcast(name='bcast', exchange=exchange),
)
现在你可以用
celery worker -l info -A tasks -Q bcast
相关文章