SpringBoot项目整合RabbitMQ三种交换机类型实例

2021-03-04 00:00:00 实例 交换机 三种

一、RabbitMQ介绍

       RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC 的调用等等。

二、相关概念

《SpringBoot项目整合RabbitMQ三种交换机类型实例》
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
Topic:按规则转发消息(最灵活)
Headers:设置 header attribute 参数类型的交换机
Fanout:转发消息到所有绑定队列

三、SpringBoot集成RabbitMQ

1、添加jar包
在pom.xml文件中添加相关依赖

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-test</artifactId>
		</dependency>

2、配置RabbitMQ
在application.properties文件中添加相关配置信息:
《SpringBoot项目整合RabbitMQ三种交换机类型实例》

这里填写成你本地或者远程服务器搭好的RabbitMQ信息。
3、配置文件
我们这里写三种交换机类型的绑定实现:
a:direct类型:

package com.example.demo.configuration;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {
    @Bean
    public Queue Queue() {
        return new Queue("queue");
    }

}

a:topic类型:

package com.example.demo.configuration;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
    }
    
}

a:fanout类型:

package com.example.demo.configuration;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutConfig {
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置广播路由器
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

}

4、发送者与接收者
发送者

package com.example.demo.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.example.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(){
//        User user=new User();    //实现Serializable接口
//        user.setName("Lucy");
//        user.setPassword("123");
//        //1、使用JSONObject
//        String jsonUser = JSONObject.toJSONString(user);
        rabbitTemplate.convertAndSend("queue", "Hello,Rabbit!"); //direct类型
//        rabbitTemplate.convertAndSend("exchange", "topic.message", "Hello,Rabbit!"); //topic类型
//        rabbitTemplate.convertAndSend("fanoutExchange", "", "Hello,Rabbit!"); //fanout类型
    }

}  

三种类型的都在上面,测试其他类型的时候把注释去掉即可。

接收者

package com.example.demo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component

public class HelloReceiver {

    @RabbitListener(queues="topic.message")
    public void process(String str) {
        System.out.println("ReceiverA  : " + str);
    }

    @RabbitListener(queues="topic.message")
    public void process2(String str) {
        System.out.println("ReceiverB  : " + str);
    }

    @RabbitListener(queues="queue")
    public void process1(String str) {
        System.out.println("Receiver  : " + str);
    }

    @RabbitListener(queues="fanout.A")
    public void process3(String str) {
        System.out.println("ReceiverA  : " + str);
    }

    @RabbitListener(queues="fanout.B")
    public void process4(String str) {
        System.out.println("ReceiverB  : " + str);
    }

    @RabbitListener(queues="fanout.C")
    public void process5(String str) {
        System.out.println("ReceiverC  : " + str);
    }

}

@RabbitListener
加上这个注解后,接收者就可以监听后面配置的queues里的消息了,如果queues里有消息,就会自动去拿,然后消费掉。

5、测试类

package com.example.demo.Test;

import com.example.demo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQHelloTest {
    @Autowired
    private HelloSender helloSender;


    @Test
    public void hello() {
        helloSender.send();
    }


}

启动测试类,看到运行结果有:
《SpringBoot项目整合RabbitMQ三种交换机类型实例》
《SpringBoot项目整合RabbitMQ三种交换机类型实例》
从接收者这个类中可以看到是这个进程打印的Receiver : Hello,Rabbit!
这个queue对应的就是direct:
《SpringBoot项目整合RabbitMQ三种交换机类型实例》
将发送者这个类改成相应topic的:
《SpringBoot项目整合RabbitMQ三种交换机类型实例》
然后在启动测试类,看到运行结果:
《SpringBoot项目整合RabbitMQ三种交换机类型实例》
再来测试fanout类型的:
《SpringBoot项目整合RabbitMQ三种交换机类型实例》
启动测试类:
《SpringBoot项目整合RabbitMQ三种交换机类型实例》
这里思考一下,为啥前两类消息都是被一个消费者消费,而这个却有3个呢?
原因是因为它是广播式的,给绑定到了不同的三个队列,所以消息会进入到这3个队列中,自然会被消费3次。《SpringBoot项目整合RabbitMQ三种交换机类型实例》
现在很多大公司都用了这个技术,小伙伴们可以好好研究一下~~

    原文作者:zeng_ll
    原文地址: https://blog.csdn.net/zeng_ll/article/details/90373882
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。

相关文章