Kafka/RocketMQ 多线程消费时如何保证消费顺序?

2020-05-28 00:00:00 消息 线程 队列 消费 顺序

上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下:

这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/RocketMQ 消费线程模型。

Kafka

kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka 两种多线程消费模型。

1、每个线程维护一个 KafkaConsumer

这样相当于一个进程内拥有多个消费者,也可以说消费组内成员是有多个线程内的 KafkaConsumer 组成的。

但其实这个消费模型是存在很大问题的,从消费消费模型可看出每个 KafkaConsumer 会负责固定的分区,因此无法提升单个分区的消费能力,如果一个主题分区数量很多,只能通过增加 KafkaConsumer 实例提高消费能力,这样一来线程数量过多,导致项目 Socket 连接开销巨大,项目中一般不用该线程模型去消费。

2、单 KafkaConsumer 实例 + 多 worker 线程

针对个线程模型的缺点,我们可采取 KafkaConsumer 实例与消息消费逻辑解耦,把消息消费逻辑放入单独的线程中去处理,线程模型如下:

从消费线程模型可看出,当 KafkaConsumer 实例与消息消费逻辑解耦后,我们不需要创建多个 KafkaConsumer 实例就可进行多线程消费,还可根据消费的负载情况动态调整 worker 线程,具有很强的独立扩展性,在公司内部使用的多线程消费模型就是用的单 KafkaConsumer 实例 + 多 worker 线程模型。

但这个消费模型由于消费逻辑是利用多线程进行消费的,因此并不能保证其消息的消费顺序,在这里我们可以引入阻塞队列的模型,一个 woker 线程对应一个阻塞队列,线程不断轮训从阻塞队列中获取消息进行消费,对具有相同 key 的消息进行取模,并放入相同的队列中,实现顺序消费, 消费模型如下:

但是以上两个消费线程模型,存在一个问题:

在消费过程中,如果 Kafka 消费组发生重平衡,此时的分区被分配给其它消费组了,如果拉取回来的消息没有被消费,虽然 Kakfa 可以实现 ConsumerRebalanceListener 接口,在新一轮重平衡前主动提交消费偏移量,但这貌似解决不了未消费的消息被打乱顺序的可能性?

因此在消费前,还需要主动进行判断此分区是否被分配给其它消费者处理,并且还需要锁定该分区在消费当中不能被分配到其它消费者中(但 kafka 目前做不到这一点)。

参考 RocketMQ 的做法:

在消费前主动调用 ProcessQueue#isDropped 方法判断队列是否已过期,并且对该队列进行加锁处理(向 broker 端请求该队列加锁)。

RocketMQ

RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已为你准备好了你的需求,它本身的消费模型就是单 consumer 实例 + 多 worker 线程模型,有兴趣的小伙伴可以从以下方法观摩 RocketMQ 的消费逻辑:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

相关文章