Kafka Consumer消费能力较低时的解决方案

2020-05-27 00:00:00 数据 执行 消息 消费 提交

背景

随着业务的发展,项目组有大量的任务需要处理。

这些任务需要主要分为两种类型:

  • 通过接口调用, 后台执行任务
  • 通过调度系统定时执行

接口调用就需要执行任务不能阻塞, 不然系统的处理能力就会下降。任务调度系统需要在在一个小的检测粒度时间内,执行完所有任务。这两种情况都面临这样一个问题, 任务不能阻塞,不然会非常影响性能。所以需要引入消息中间件,将任务派发方和任务执行方分离出来。

在这种情况下, 我们选择了kafka作为了我们的消息中间件, 选择kafka主要基于以下几点:

  • 支持分布式, 避免单点问题
  • 技术方案成熟, 公司内部有上线项目
  • 性能优异, 能够持久化消息

遇到的问题

我们团队在kafka使用上面都没有经验, 其他同事说kafka consumer在消费超时后会掉线,导致重复消费,当时没有这个使用场景,不能理解这个概念。

次发现问题是在联调的时候,任务执行方发现consumer会打印出错误日志,重复消费,并且陷入循环。

当时很快定位到问题, consumer长时间没有发送心跳包, 导致触发rebalance操作, consumer被踢下线了。

对于这个问题,需要详细讲述一下kafka consumer相关的机制。

kafka为了保证partition分配的高效率, 使用了如下机制:

  1. 所有的consumer都要和coordinator连接
  2. coordinator选出一个consumer作为leader来分配partition
  3. leader分配完以后通知coordinator, 由coordinator来通知给其他consumer
  4. 如果一个consumer不能工作了, coordinator会触发rebalance机制,重新分配partition

coordinator判定一个consumer不能工作, 依靠的就是heartbeat机制。consumer的配置里面有一项是session_timeout,如果heartbeat不能在session_timeout时间内发出一次请求,coordinator就会触发一次rebalance操作,重新分配partition。

从上面这样看没什么问题,很多系统都是这么设计的,一个工作线程,一个心跳包线程。但是kafka consumer为了设计上的简单(或者是出于其他目的),他们只有一个线程,也就是说工作逻辑和心跳包逻辑是同步的。对于心跳包这种定时任务,他们使用了一种叫做delayed_task的方案。

delayed_task是Best-Effort的,为什么这么说呢,我们来看看delayed_task是在什么时候工作的:

  1. 取出一批数据
  2. 执行delayed_task
  3. 循环yield 这批数据
  4. 重复执行上述过程

前面我们也说过, consumer只有一个线程, 也就意味着,如果主逻辑消耗了大量时间,delayed_task中的任务就会延期执行。在这种情况下, delayed_task只能保证任务不会提前进行,不能保证任务准时执行。拿一个具体的场景来说, 如果主逻辑花费了60s, 那么delayed_task中的任务早也只能在60s之后执行,像heartbeat任务就直接超时了。


在提出解决方案之前, 我们需要考虑一下几个问题:

生产者速度大于消费者速度怎么处理

如果生产者速度大于消费者速度,消息就会积累。常规的解决方案是增加partition,增加消费者数量,但是在某一些场景下却不能这么实现。思考一下,如果生产者的速度不是恒定的,而是波动的,并且波峰和波谷差距比较大,大部分时间出于波谷,这样在波谷时其实资源是闲置的,并且会降低消费速度。另外对于消费的实时性比较高的场景,如果短时间内消息被积压,纵然后能够消费掉,但是已经过了有效期,这样的消费其实是的。

所以我们必须有能力知道两个数据,即当前队列剩余的消息的数量和当前消息产生的时间。

在消费速度不一致的情况下如何提交offset

kafka-consumer的offset的提交机制是定时向delayed_task里面加入一个AutoCommitTask。但是在消费者消费速度不均衡的情况下不能这么做,如果消费者消费速度比较快,定时提交offset的机制会使得一旦consumer宕机,会丢失一大批消费信息。
同时我们也不能单纯的以消费数量作为是否提交的阈值,在消费者比较消费速率比较慢的情况下,一旦consumer宕机,我们会耗费大量时间在无用的消费上面。
所以我们需要同时衡量数量和时间两个变量,作为我们是否提交的阈值

offset提交失败该怎么处理

consumer的offset提交是按照TopicPartition作为提交单元的。在consumer消费过程中,可能会发生reblance事件,如果当前consumer分配到的partition数量大于1个,可能这个partition会被分配给其他的consumer。在这个过程中,consumer已经消费了该条数据,那么在提交offset的时候,就会遇到CommitOffsetError,因为这个partition已经不属于自己了。
这种情况下该如何处理这些数据

解决方案

带着上面的一些问题,我们开始着手提出解决方案。

从上面的分析可以看出来, consumer掉线的主要问题就是delayed_task和主函数出于同一个工作线程中,那么直观的解决方法就是将这两个分离出来。

由于python GIL的限制,加上kafka consumer 是线程不安全的, 所以我们使用多进程来解决这个问题。

在consumer中,除了迭代器_message_generator之外,还提供了一个poll函数。这个函数和迭代器功能差不多,也能够获取消息,同时也会执行delayed_task。不同之处是, 这个函数会一次性返回一批数据,这样我们就有能力统计剩下的消息的数量。同时我们要求在producer发送消息的时候,一定要带上create_time这个字段,标注消息产生的时间。客户端现在同时能获取数量和时间两个参数,对于实时性要求比较高的场景,他就可以选择性的丢弃一批不满足要求的数据。

当消费者消费速度比较低的时候,我们需要停止获取数据,但是同时不能停下delayed_task。幸运的是,consumer提供了一个pause的函数,可以让我们停止对应的partition。一旦使用pause函数,poll函数将不会返回任何数据,单他依然会执行delayed_task。

由于我们使用poll函数一次性返回多个数据,加上在消费速度不均衡的情况下offset管理的问题。所以我们必须要手动管理offset, 保存我们上次提交offset的时间和未提交offset的数量,一旦其中某一个达到阈值,就真正的提交offset。

当我们提交offset失败的时候,我们需要清除对应的partition的所有数据,防止consumer做无用消费。


综合上面,我们就有能力构造出一个强健的consumer客户端,方便其他同学来使用。


核心代码

while True:
    topic_records = self.consumer.poll().values()
    if not topic_records:
        self.get_offset()
        time.sleep(self.config['idle_timeout'])
    self.consumer.pause(*self.consumer.assigment())
    paused = True
    for records in topic_records:
        remain = len(records)
        for record in records:
        while True:
            data = {"record":record, "remain": remain}
            try:
                self.task_queue.put(data, self.config['block_timeout'])
                remain -= 1
              break
            except Full:
                self.consumer.poll()
                self.get_offset()
    if self.task_queue.qsize < self.config['resumen_count'] and paused:
        partitions = self.consumer.paused()
        if partitions:
            self.consumer.resume(*partitions)

相关文章