spring amqp rabbitmq MessageListener 不工作

我正在尝试使用 spring amqp 使用rabbitmq,下面是我的配置.

I am trying to use rabbitmq using spring amqp, below is my configuration.

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />

<beans:bean id="importExchangeMessageListener"
    class="com.stockopedia.batch.foundation.ImportMessageListener" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>

这是一个简单的消息监听类,

This is simple Message Listener class,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class ImportMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
    }

}

这是producer(即spring batch的itemWriter),

This is producer (which is itemWriter of spring batch),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

当我运行我的 spring 批处理作业时, ImportItemWriter.write 被调用.但是 ImportMessageListener.onMessage 不起作用.它不打印消息.我得到控制台上所有项目的低于输出

When I run my spring batch job, ImportItemWriter.write gets called. But ImportMessageListener.onMessage does not work. It doesnt print the message. I get below output for all items on console

producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null

推荐答案

您的消费者没有发送结果...

Your consumer is not sending a result...

@Override
public void onMessage(Message message) {
    System.out.println("consumer output: " + message);
}

改成简单的POJO;容器的 MessageListenerAdapter 将为您处理转换,并发送结果.

Change it to a simple POJO; the container's MessageListenerAdapter will take care of the conversion for you, and send the result.

@Override
public String handleMessage(String message) {
    System.out.println("consumer output: " + message);
    return "result";
}

您还没有设置任何交换或路由到您的队列.如果您想使用默认交换/路由,请使用...

You also haven't set up any exchange or routing to your queue. If you want to use default exchange/routing, use...

convertSendAndReceive("", queueName, item.toString());

或者,将模板上的 routingKey 设置为队列名称,然后可以使用更简单的方法.

Or, set the routingKey on the template to the queue name and then you can use the simpler method.

...sendAndReceive() 方法适用于请求/回复场景,因此需要阻塞.要异步执行此操作,您必须使用 ...send() 方法之一,并连接您自己的 SimpleListenerContainer 以接收回复;你将不得不做你自己的相关性.使用

The ...sendAndReceive() methods are meant for request/reply scenarios so blocking is required. To do it asynchronously, you have to use one of the ...send() methods, and wire up your own SimpleListenerContainer to receive the replies; you will have to do your own correlation. Use

public void convertAndSend(Object message, MessagePostProcessor postProcessor)

在您的消息后处理器中,设置 replyTocorrelationId 标头...

and in your message post processor, set the replyTo and correlationId headers...

message.getMessageProperties().setReplyTo("foo");
message.getMessageProperties().setCorrelationId("bar");

或者,自己构建 Message 对象(例如 通过使用 MessageBuilder) 并使用 send 方法...

Or, build the Message object yourself (e.g by using the MessageBuilder) and use the send method...

template.send(MessageBuilder.withBody("foo".getBytes())
            .setReplyTo("bar")
            .setCorrelationId("baz".getBytes())
            .build());

每个请求都需要一个唯一的 correlationId,以便您可以关联响应.

Each request needs a unique correlationId so you can correlate the response.

相关文章