SpringBoot项目整合RabbitMQ三种交换机类型实例
一、RabbitMQ介绍
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC 的调用等等。
二、相关概念
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
Topic:按规则转发消息(最灵活)
Headers:设置 header attribute 参数类型的交换机
Fanout:转发消息到所有绑定队列
三、SpringBoot集成RabbitMQ
1、添加jar包
在pom.xml文件中添加相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
2、配置RabbitMQ
在application.properties文件中添加相关配置信息:
这里填写成你本地或者远程服务器搭好的RabbitMQ信息。
3、配置文件
我们这里写三种交换机类型的绑定实现:
a:direct类型:
package com.example.demo.configuration;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean
public Queue Queue() {
return new Queue("queue");
}
}
a:topic类型:
package com.example.demo.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean(name="message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
}
}
a:fanout类型:
package com.example.demo.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean(name="Amessage")
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean(name="Bmessage")
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean(name="Cmessage")
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");//配置广播路由器
}
@Bean
Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
4、发送者与接收者
发送者
package com.example.demo.rabbitmq;
import com.alibaba.fastjson.JSONObject;
import com.example.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(){
// User user=new User(); //实现Serializable接口
// user.setName("Lucy");
// user.setPassword("123");
// //1、使用JSONObject
// String jsonUser = JSONObject.toJSONString(user);
rabbitTemplate.convertAndSend("queue", "Hello,Rabbit!"); //direct类型
// rabbitTemplate.convertAndSend("exchange", "topic.message", "Hello,Rabbit!"); //topic类型
// rabbitTemplate.convertAndSend("fanoutExchange", "", "Hello,Rabbit!"); //fanout类型
}
}
三种类型的都在上面,测试其他类型的时候把注释去掉即可。
接收者
package com.example.demo.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HelloReceiver {
@RabbitListener(queues="topic.message")
public void process(String str) {
System.out.println("ReceiverA : " + str);
}
@RabbitListener(queues="topic.message")
public void process2(String str) {
System.out.println("ReceiverB : " + str);
}
@RabbitListener(queues="queue")
public void process1(String str) {
System.out.println("Receiver : " + str);
}
@RabbitListener(queues="fanout.A")
public void process3(String str) {
System.out.println("ReceiverA : " + str);
}
@RabbitListener(queues="fanout.B")
public void process4(String str) {
System.out.println("ReceiverB : " + str);
}
@RabbitListener(queues="fanout.C")
public void process5(String str) {
System.out.println("ReceiverC : " + str);
}
}
@RabbitListener
加上这个注解后,接收者就可以监听后面配置的queues里的消息了,如果queues里有消息,就会自动去拿,然后消费掉。
5、测试类
package com.example.demo.Test;
import com.example.demo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() {
helloSender.send();
}
}
启动测试类,看到运行结果有:
从接收者这个类中可以看到是这个进程打印的Receiver : Hello,Rabbit!
这个queue对应的就是direct:
将发送者这个类改成相应topic的:
然后在启动测试类,看到运行结果:
再来测试fanout类型的:
启动测试类:
这里思考一下,为啥前两类消息都是被一个消费者消费,而这个却有3个呢?
原因是因为它是广播式的,给绑定到了不同的三个队列,所以消息会进入到这3个队列中,自然会被消费3次。
现在很多大公司都用了这个技术,小伙伴们可以好好研究一下~~
原文地址: https://blog.csdn.net/zeng_ll/article/details/90373882
本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
相关文章