使用 Spring AMQP 接收和发送 Java 对象
我想实现 Spring AMQP 示例,以使用侦听器发送和接收 Java 对象.我试过这个:
I want to implement Spring AMQP example for sending and receiving Java Objects using listener. I tried this:
发送 Java 对象
ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));
接收并发送回另一个 Java 对象:
Receive and send Back another Java Object:
ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false))
.to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
TransactionsBean obj = (TransactionsBean) template.receiveAndConvert(QUEUE_PROCESSING_TRANSACTION);
System.out.println(" !!!!!!! Received id " + obj.getTransaction_id());
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(new Queue(QUEUE_PROCESSING_TRANSACTION, false));
container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// Receive here Java object and send back another object
}
});
你能告诉我如何在没有复杂注释的情况下扩展代码,只需要简单的侦听器吗?
Can you show me how to extend the code without complex annotations just simple listeners?
推荐答案
最简单的方法是使用 @RabbitListener
- 使用 Spring Boot 时更容易,因为他会连接基础设施 bean(模板、管理员等).
The simplest way is to use a @RabbitListener
- made even easier when using Spring Boot since he will wire up infrastructure beans (template, admin, etc).
@SpringBootApplication
public class So51009346Application {
public static final String QUEUE_PROCESSING_TRANSACTION = "q1";
public static void main(String[] args) {
SpringApplication.run(So51009346Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
System.out.println(reply);
};
}
@Bean
public Queue queue() {
return new Queue(QUEUE_PROCESSING_TRANSACTION);
}
@Bean
public TopicExchange te() {
return new TopicExchange("ex");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(te()).with("rk");
}
}
class RequestObject implements Serializable {
private static final long serialVersionUID = 1L;
}
class ReplyObject implements Serializable {
private static final long serialVersionUID = 1L;
}
@Component
class Listener {
@RabbitListener(queues = So51009346Application.QUEUE_PROCESSING_TRANSACTION)
public ReplyObject process(RequestObject ro) {
return new ReplyObject();
}
}
如果您出于某种原因不想使用该注释,您可以使用 MessageListenerAdapter 连接一个容器...
If you don't want to use that annotation for some reason, you can wire up a container using a MessageListenerAdapter...
@SpringBootApplication
public class So51009346Application {
public static final String QUEUE_PROCESSING_TRANSACTION = "q1";
public static void main(String[] args) {
SpringApplication.run(So51009346Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
System.out.println(reply);
};
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
container.setMessageListener(new MessageListenerAdapter(listener, "process"));
return container;
}
@Bean
public Queue queue() {
return new Queue(QUEUE_PROCESSING_TRANSACTION);
}
@Bean
public TopicExchange te() {
return new TopicExchange("ex");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(te()).with("rk");
}
}
class RequestObject implements Serializable {
private static final long serialVersionUID = 1L;
}
class ReplyObject implements Serializable {
private static final long serialVersionUID = 1L;
}
@Component
class Listener {
public ReplyObject process(RequestObject ro) {
return new ReplyObject();
}
}
当然,您可以自己连接容器,就像在您的问题中一样,使用适配器,但通常最好让 Spring 将其作为 @Bean
进行管理,否则您会错过一些功能(例如失败的事件发布,空闲容器).适配器获取对您的请求/回复侦听器的引用以及要调用的方法名称.
You can, of course, wire up the container yourself, as in your question, using the adapter, but it's generally better to let Spring manage it as a @Bean
or you will miss some functionality (e.g. event publishing for failures, idle container). The adapter gets a reference to your request/reply listener and the method name to call.
相关文章