使用 Spring AMQP 接收和发送 Java 对象

2022-01-11 00:00:00 rabbitmq spring java spring-amqp

我想实现 Spring AMQP 示例,以使用侦听器发送和接收 Java 对象.我试过这个:

I want to implement Spring AMQP example for sending and receiving Java Objects using listener. I tried this:

发送 Java 对象

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));              
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));

接收并发送回另一个 Java 对象:

Receive and send Back another Java Object:

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false))
                .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = (TransactionsBean) template.receiveAndConvert(QUEUE_PROCESSING_TRANSACTION);
System.out.println(" !!!!!!! Received id " + obj.getTransaction_id());

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(new Queue(QUEUE_PROCESSING_TRANSACTION, false));

container.setMessageListener(new MessageListener() {
  @Override
  public void onMessage(Message message) {
    // Receive here Java object and send back another object
  }
});

你能告诉我如何在没有复杂注释的情况下扩展代码,只需要简单的侦听器吗?

Can you show me how to extend the code without complex annotations just simple listeners?

推荐答案

最简单的方法是使用 @RabbitListener - 使用 Spring Boot 时更容易,因为他会连接基础设施 bean(模板、管理员等).

The simplest way is to use a @RabbitListener - made even easier when using Spring Boot since he will wire up infrastructure beans (template, admin, etc).

@SpringBootApplication
public class So51009346Application {

    public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

    public static void main(String[] args) {
        SpringApplication.run(So51009346Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
            System.out.println(reply);
        };
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_PROCESSING_TRANSACTION);
    }

    @Bean
    public TopicExchange te() {
        return new TopicExchange("ex");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(te()).with("rk");
    }

}

class RequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

@Component
class Listener {

    @RabbitListener(queues = So51009346Application.QUEUE_PROCESSING_TRANSACTION)
    public ReplyObject process(RequestObject ro) {
        return new ReplyObject();
    }

}

如果您出于某种原因不想使用该注释,您可以使用 MessageListenerAdapter 连接一个容器...

If you don't want to use that annotation for some reason, you can wire up a container using a MessageListenerAdapter...

@SpringBootApplication
public class So51009346Application {

    public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

    public static void main(String[] args) {
        SpringApplication.run(So51009346Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
            System.out.println(reply);
        };
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
        container.setMessageListener(new MessageListenerAdapter(listener, "process"));
        return container;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_PROCESSING_TRANSACTION);
    }

    @Bean
    public TopicExchange te() {
        return new TopicExchange("ex");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(te()).with("rk");
    }

}

class RequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

@Component
class Listener {

    public ReplyObject process(RequestObject ro) {
        return new ReplyObject();
    }

}

当然,您可以自己连接容器,就像在您的问题中一样,使用适配器,但通常最好让 Spring 将其作为 @Bean 进行管理,否则您会错过一些功能(例如失败的事件发布,空闲容器).适配器获取对您的请求/回复侦听器的引用以及要调用的方法名称.

You can, of course, wire up the container yourself, as in your question, using the adapter, but it's generally better to let Spring manage it as a @Bean or you will miss some functionality (e.g. event publishing for failures, idle container). The adapter gets a reference to your request/reply listener and the method name to call.

相关文章