秒懂消息队列MQ,看这篇就够了!

2022-12-09 00:00:00 示例 消息 发送 队列 系统

前面介绍了分布式锁以及如何使用Redis实现分布式锁,接下来介绍分布式系统中另外一个非常重要的组件:消息队列。

消息队列是大型分布式系统不可缺少的中间件,也是高并发系统的基石中间件,所以掌握好消息队列MQ就变得极其重要。接下来我就将从零开始介绍什么是消息队列?消息队列的应用场景?如何进行选型?如何在Spring Boot项目中整合集成消息队列。


一、消息队列概述

消息队列(Message Queue,简称MQ)指保存消息的一个容器,其实本质就是一个保存数据的队列。

消息中间件是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的构建。

消息中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和终一致性的系统架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。


二、消息队列应用场景

消息中间件在互联网公司使用得越来越多,主要用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削峰和消息通讯四个场景。

2.1 异步处理

异步处理,就是将一些非核心的业务流程以异步并行的方式执行,从而减少请求响应时间,提高系统吞吐量。

以下单为例,用户下单后需要生成订单、赠送活动积分、赠送红包、发送下单成功通知等一系列业务处理。假设三个业务节点每个使用100毫秒钟,不考虑网络等其他开销,则串行方式的时间是400毫秒,并行的时间只需要200毫秒。这样就大大提高了系统的吞吐量。

2.2 应用解耦

应用解耦,顾名思义就是解除应用系统之间的耦合依赖。通过消息队列,使得每个应用系统不必受其他系统影响,可以更独立自主。

以电商系统为例,用户下单后,订单系统需要通知积分系统。一般的做法是:订单系统直接调用积分系统的接口。这就使得应用系统间的耦合特别紧密。如果积分系统无法访问,则积分处理失败,从而导致订单失败。

加入消息队列之后,用户下单后,订单系统完成下单业务后,将消息写入消息队列,返回用户订单下单成功。积分系统通过订阅下单消息的方式获取下单通知消息,从而进行积分操作。实现订单系统与库存系统的应用解耦。如果,在下单时积分系统系统异常,也不会影响用户正常下单。

2.3 流量削峰

流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

以秒杀活动为例,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列,后台系统根据消息队列中的消息信息,进行秒杀业务处理。

如上图所示,服务器接收到用户的请求后,首先写入消息队列,后台系统根据消息队列中的请求信息,做后续业务处理。假如消息队列长度超过大数量,则直接抛弃用户请求或跳转到错误页面。

2.4 消息通讯

消息通讯是指应用间的数据通信。消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等点对点通讯。

以上实际是消息队列的两种消息模式,点对点或发布订阅模式。


三、如何选择合适的消息队列

目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。面对这么多的中消息队列中间件,如何选择适合我们自身业务的消息中间件呢?

3.1 衡量标准

虽然这些消息队列在功能和特性方面各有优劣,但我们在选型时要有基本衡量标准:

1、首先,是开源。开源意味着,如果有一天你使用的消息队列遇到了一个影响你系统业务的Bug,至少还有机会通过修改源代码来迅速修复或规避这个Bug,解决你的系统的问题,而不是等待开发者发布的下一个版本来解决。

2、其次,是社区活跃度。这个产品必须是近年来比较流行并且有一定社区活跃度的产品。我们知道,开源产品越流行 Bug 越少,因为大部分遇到的 Bug,其他人早就遇到并且修复了。而且在使用过程中遇到的问题,也比较容易在网上搜索到类似的问题并快速找到解决方案。同时,流行开源产品一般与周边生态系统会有一个比较好的集成和兼容。

3、后,作为一款及格的消息队列,必须具备的几个特性包括:

  • 消息的可靠传递:确保不丢消息;

  • 支持集群:确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;

  • 性能:具备足够好的性能,能满足绝大多数场景的性能要求。


3.2 选型对比

接下来我们一起看一下有哪些符合上面这些条件,可供选择的开源消息队列产品。以下是关于各个消息队列中间件的选型对比:

特性 Kafka RocketMQ RabbitMQ ActiveMQ
单机吞吐量 10万级 10万级 万级 10万级
开发语言 Scala Java Erlang Java
高可用 分布式 分布式 主从 分布式
消息延迟 ms级 ms级 us级 ms级
消息丢失 理论上不会丢失 理论上不会丢失
消费模式 拉取 推拉 推拉
持久化
文件 内存,文件 内存,文件,数据库
支持协议 自定义协议 自定义协议 AMQP,XMPP, SMTP,STOMP AMQP,MQTT,OpenWire,STOMP
社区活跃度
管理界面
web console 一般
部署难度

部署方式 独立 独立 独立 独立,嵌入
成熟度 成熟 比较成熟 成熟 成熟
综合评价 优点:拥有强大的性能及吞吐量,兼容性很好。
   缺点:由于支持消息堆积,导致延迟比较高。
优点:性能好,稳定可靠,有活跃的中文社区,特点响应快。
   缺点:兼容性较差,但随着影响力的扩大,该问题会有改善。
优点:产品成熟,容易部署和使用,拥有灵活的路由配置。
   缺点:性能和吞吐量较差,不易进行二次开发。
优点:产品成熟,支持协议多,支持多种语言的客户端。
   缺点:社区不活跃,存在消息丢失的可能。

以上四种消息队列都有各自的优劣势,需要根据现有系统的情况,选择适合的消息队列。

总结起来,电商、金融等对事务性要求很高的,可以考虑RocketMQ;技术挑战不是特别高,用 RabbitMQ 是不错的选择;如果是大数据领域的实时计算、日志采集等场景可以考虑 Kafka。


四、Spring Boot整合RabbitMQ实现消息队列

Spring Boot提供了spring-bootstarter-amqp组件对消息队列进行支持,使用非常简单,仅需要非常少的配置即可实现完整的消息队列服务。

接下来介绍Spring Boot对RabbitMQ的支持。如何在SpringBoot项目中使用RabbitMQ?

4.1 Spring Boot集成RabbitMQ

Spring Boot提供了spring-boot-starter-amqp组件,只需要简单的配置即可与Spring Boot无缝集成。下面通过示例演示集成RabbitMQ实现消息的接收和发送。

步,配置pom包。

创建Spring Boot项目并在pom.xml文件中添加spring-bootstarter-amqp等相关组件依赖:

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId></dependency>

在上面的示例中,引入Spring Boot自带的amqp组件spring-bootstarter-amqp。


第二步,修改配置文件。

修改application.properties配置文件,配置rabbitmq的host地址、端口以及账户信息。

spring.rabbitmq.host=10.2.1.231spring.rabbitmq.port=5672spring.rabbitmq.username=zhangweizhongspring.rabbitmq.password=weizhong1988spring.rabbitmq.virtualHost=order

在上面的示例中,主要配置RabbitMQ服务的地址。RabbitMQ配置由spring.rabbitmq.* 配置属性控制。virtual-host配置项指定RabbitMQ服务创建的虚拟主机,不过这个配置项不是必需的。

第三步,创建消费者

消费者可以消费生产者发送的消息。接下来创建消费者类Consumer,并使用@RabbitListener注解来指定消息的处理方法。示例代码如下:

@Componentpublic class Consumer {  @RabbitHandler  @RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue"))  public void process(String message) {    System.out.println("消费者消费消息111=====" + message);  }}

在上面的示例中,Consumer消费者通过@RabbitListener注解创建侦听器端点,绑定rabbitmq_queue队列。

(1)@RabbitListener注解提供了@QueueBinding、@Queue、@Exchange等对象,通过这个组合注解配置交换机、绑定路由并且配置监听功能等。

(2)@RabbitHandler注解为具体接收的方法。

第四步,创建生产者

生产者用来产生消息并进行发送,需要用到RabbitTemplate类。与之前的RedisTemplate类似,RabbitTemplate是实现发送消息的关键类。示例代码如下:

@Componentpublic class Producer {  @Autowired  private RabbitTemplate rabbitTemplate;    public void produce() {    String message = new Date() + "Beijing";    System.out.println("生产者产生消息=====" + message);    rabbitTemplate.convertAndSend("rabbitmq_queue", message);  }}

如上面的示例所示,RabbitTemplate提供了 convertAndSend方法发送消息。convertAndSend方法有routingKey和message两个参数:

(1)routingKey为要发送的路由地址。

(2)message为具体的消息内容。发送者和接收者的queuename必须一致,不然无法接收。


第五步,测试验证。

创建对应的测试类ApplicationTests,验证消息发送和接收是否成功。

@RunWith(SpringRunner.class)@SpringBootTestpublic class ApplicationTests {  @Autowired  Producer producer;  @Test  public void contextLoads() throws InterruptedException {    producer.produce();     Thread.sleep(1*1000);  }}

在上面的示例中,首先注入生产者对象,然后调用produce()方法来发送消息。

后,单击Run Test或在方法上右击,选择Run 'contextLoads()',运行单元测试程序,查看后台输出情况,结果如下图所示。

通过上面的程序输出日志可以看到,消费者已经收到了生产者发送的消息并进行了处理。这是常用的简单使用示例。


4.2 发送和接收实体对象

Spring Boot支持对象的发送和接收,且不需要额外的配置。下面通过一个例子来演示RabbitMQ发送和接收实体对象。


4.2.1 定义消息实体

首先,定义发送与接收的对象实体User类,代码如下

public class User implements Serializable {  public String name;  public String password;  // 省略get和set方法}

在上面的示例中,定义了普通的User实体对象。需要注意的是,实体类对象必须继承Serializable序列化接口,否则会报数据无法序列化的错误。

4.2.2 定义消费者

修改Consumer类,将参数换成User对象。示例代码如下:

@Componentpublic class Consumer {  @RabbitHandler  @RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue_object"))  public void process(User user) {    System.out.println("消费者消费消息111user=====name:" + user.getName()+",password:"+user.getPassword());    }}

其实,消费者类和消息处理方法和之前的类似,只不过将参数换成了实体对象,监听rabbitmq_queue_object队列。


4.2.3 定义生产者

修改Producer类,定义User实体对象,并通过convertAndSend方法发送对象消息。示例代码如下:

@Componentpublic class Producer {  @Autowired  private RabbitTemplate rabbitTemplate;    public void produce() {     User user=new User();    user.setName("weiz");    user.setPassword("123456");    System.out.println("生产者生产消息111=====" + user);        rabbitTemplate.convertAndSend("rabbitmq_queue_object", user);  }}

在上面的示例中,还是调用convertAndSend()方法发送实体对象。convertAndSend()方法支持String、Integer、Object等基础的数据类型。


4.2.4 验证测试

创建单元测试类,注入生产者对象,然后调用produceObj()方法发送实体对象消息,从而验证消息能否被成功接收。

@RunWith(SpringRunner.class)@SpringBootTestpublic class ApplicationTests {  @Autowired  Producer producer;  @Test  public void testProduceObj() throws InterruptedException {    producer.produceObj();    Thread.sleep(1*1000);  }}

后,单击Run Test或在方法上右击,选择Run 'contextLoads()',运行单元测试程序,查看后台输出情况,运行结果如下图所示。

通过上面的示例成功实现了RabbitMQ发送和接收实体对象,使得消息的数据结构更加清晰,也更加贴合面向对象的编程思想。


以上,我们就把消息队列介绍完了。消息中间件在互联网公司使用得越来越多,希望大家能够熟悉其使用。


相关文章