在运行时向兔子侦听器动态添加队列

2022-01-11 00:00:00 rabbitmq java spring-amqp

我有一个项目,我们将在 rabbit 中拥有数百个(可能数千个)队列,并且每个队列都需要被一个消费者池消费.

I've got a project where we are going to have hundreds (potentially thousands) of queues in rabbit and each of these queues will need to be consumed by a pool of consumers.

在 rabbit(使用 spring-amqp)中,你有 rabbitlistener 注释,它允许我静态分配这个特定消费者将处理的队列.

In rabbit (using spring-amqp), you have the rabbitlistener annotation which allows me to statically assign the queues this particular consumer(s) will handle.

我的问题是 - 对于 rabbit 和 spring,我是否有一种干净的方式来获取一段队列(比如说以 ac 开头的队列),然后还监听在消费者运行时创建的任何队列.

My question is - with rabbit and spring, is there a clean way for me to grab a section of queues (lets say queues that start with a-c) and then also listen for any queues that are created while the consumer is running.

示例(开始时):

  • 蚂蚁队列
  • 苹果队列
  • 猫队列

消费者运行时:

  • 添加蝙蝠队列

这是我目前拥有的(非常简单的)代码:

Here is the (very simple) code I currently have:

    @Component
    public class MessageConsumer {

        public MessageConsumer() {
            // ideally grab a section of queues here, initialize a parameter and give to the rabbitlistener annotation
        }

        @RabbitListener(queues= {"ant-queue", "apple-queue", "cat-queue"})
        public void processQueues(String messageAsJson) {
            < how do I update the queues declared in rabbit listener above ? >
        }
    }

我应该补充一下 - 我已经浏览了我在网上找到的 spring amqp 文档,除了静态(硬编码或通过属性)声明队列之外,我没有找到任何东西

I should add - I've gone through the spring amqp documentation I found online and I haven't found anything outside of statically (either hardcoded or via properties) declaring the queues

推荐答案

  • 注入(@Autowired 或其他方式)RabbitListenerEndpointRegistry.

    获取对监听器容器的引用(使用注解上的id属性给它一个已知的id)(registry.getListenerContainer(id)).

    Get a reference to the listener container (use the id attribute on the annotation to give it a known id) (registry.getListenerContainer(id)).

    将容器转换为 AbstractMessageListenerContainer 并调用 addQueues()addQueueNames().

    Cast the container to an AbstractMessageListenerContainer and call addQueues() or addQueueNames().

    请注意,动态添加队列时使用 DirectMessageListenerContainer 效率更高;使用 SimpleMessageListenerContainer 消费者会停止并重新启动.使用直接容器,每个队列都有自己的消费者.

    Note that is more efficient to use a DirectMessageListenerContainer when adding queues dynamically; with a SimpleMessageListenerContainer the consumer(s) are stopped and restarted. With the direct container, each queue gets its own consumer(s).

    请参阅选择容器.

相关文章