SpringBoot整合RabbitMQ实战教程附死信交换机
前言
使用SpringBoot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库、报警等
完整demo项目代码https://gitee.com/daenmax/rabbit-MQ-demo
环境
windows10,idea,otp_win64_25.0,RabbitMQ-server-3.10.4
1.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务
2.然后访问Http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin
pom.xml
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>
配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
passWord: admin
virtual-host: admin_host
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true #开启失败重试
max-attempts: 3 #最大重试次数
initial-interval: 1000 #重试间隔时间 毫秒
配置文件
RabbitConfig
package com.example.rabitmqdemo.mydemo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Component
public class RabbitConfig {
//业务交换机
public static final String EXCHANGE_PHCP = "phcp";
//业务队列1
public static final String QUEUE_COMPANY = "company";
//业务队列1的key
public static final String ROUTINGKEY_COMPANY = "companyKey";
//业务队列2
public static final String QUEUE_PROJECT = "project";
//业务队列2的key
public static final String ROUTINGKEY_PROJECT = "projecTKEy";
//死信交换机
public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";
//死信队列1
public static final String QUEUE_COMPANY_DEAD = "company_dead";
//死信队列2
public static final String QUEUE_PROJECT_DEAD = "project_dead";
//死信队列1的key
public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";
//死信队列2的key
public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";
//
// @Bean
// public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConnectionFactory(connectionFactory);
// factory.setMessageConverter(new Jackson2JSONMessageConverter());
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// return factory;
// }
@Bean("exchangePhcp")
public DirectExchange exchangePhcp() {
return new DirectExchange(EXCHANGE_PHCP);
}
* 声明死信交换机
@Bean("exchangePhcpDead")
public DirectExchange exchangePhcpDead() {
return new DirectExchange(EXCHANGE_PHCP_DEAD);
* 声明业务队列1
*
* @return
@Bean("queueCompany")
public Queue queueCompany() {
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);
//绑定该队列到死信交换机的队列1
arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);
return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();
* 声明业务队列2
@Bean("queueProject")
public Queue queueProject() {
//绑定该队列到死信交换机的队列2
arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);
return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();
* 声明死信队列1
@Bean("queueCompanyDead")
public Queue queueCompanyDead() {
return new Queue(QUEUE_COMPANY_DEAD);
* 声明死信队列2
@Bean("queueProjectDead")
public Queue queueProjectDead() {
return new Queue(QUEUE_PROJECT_DEAD);
* 绑定业务队列1和业务交换机
* @param queue
* @param directExchange
@Bean
public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);
* 绑定业务队列2和业务交换机
public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);
* 绑定死信队列1和死信交换机
public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);
* 绑定死信队列2和死信交换机
public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);
}
生产者
RabbltProducer
package com.example.rabitmqdemo.mydemo.producer;
import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.NIO.charset.StandardCharsets;
import java.util.UUID;
@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
String str = new String(returnedMessage.getMessage().getBody());
System.out.println("消息发送失败:" + str);
}
public void sendCompany(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);
}
public void sendProject(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);
}
}
业务消费者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class RabbitConsumer {
@RabbitListener(queues = "company")
public void company(Message message, Channel channel) throws IOException {
try{
System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("处理消息"+s);
//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
//String str = null;
//str.split("1");
//处理成功,确认应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("处理消息时发生异常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("异常重试次数已到达设置次数,将发送到死信交换机");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即将返回队列处理重试");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
@RabbitListener(queues = "project")
public void project(Message message, Channel channel) throws IOException {
try{
System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("处理消息"+s);
//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
//String str = null;
//str.split("1");
//处理成功,确认应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("处理消息时发生异常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("异常重试次数已到达设置次数,将发送到死信交换机");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即将返回队列处理重试");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
死信消费者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class RabbitConsumerDead {
@RabbitListener(queues = "company_dead")
public void company_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("处理死信"+s);
//在此处记录到数据库、报警之类的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收异常:"+e.getMessage());
}
}
@RabbitListener(queues = "project_dead")
public void project_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("处理死信"+s);
//在此处记录到数据库、报警之类的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收异常:"+e.getMessage());
}
}
}
测试
MqController
package com.example.rabitmqdemo.mydemo.controller;
import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.WEB.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RequestMapping("/def")
@RestController
@Slf4j
public class MsGController {
@Resource
private RabbltProducer rabbltProducer;
@RequestMapping("/handleCompany")
public void handleCompany(@RequestBody String jsonStr){
rabbltProducer.sendCompany(jsonStr);
}
}
到此这篇关于SpringBoot整合RabbitMQ实战附加死信交换机的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ死信交换机内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
相关文章