胡夕:Apache Kafka监控与调优

2020-05-22 00:00:00 消息 线程 分区 消费者 监控

胡夕,《Apache Kafka实战》作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM、搜狗、微博等公司。国内活跃的Kafka代码贡献者。


前言

虽然目前Apache Kafka已经全面进化成一个流处理平台,但大多数的用户依然使用的是其核心功能:消息队列。对于如何有效地监控和调优Kafka是一个大话题,很多用户都有这样的困扰,今天我们就来讨论一下。


一、Kafka综述


在讨论具体的监控与调优之前,我想用一张PPT图来简单说明一下当前Kafka生态系统的各个组件。就像我前面所说,Kafka目前已经进化成了一个流处理平台,除了核心的消息队列组件Kafka core之外,社区还新引入Kafka Connect和Kafka Streams两个新的组件:其中前者负责Kafka与外部系统的数据传输;后者则负责对数据进行实时流处理计算。下图罗列了一些关键的Kafka概念。


二、Kafka监控


我打算从五个维度来讨论Kafka的监控。首先是要监控Kafka集群所在的主机;第二是监控Kafka broker JVM的表现;第三点,我们要监控Kafka Broker的性能;第四,我们要监控Kafka客户端的性能。这里的所指的是广义的客户端——可能是指我们自己编写的生产者、消费者,也有可能是社区帮我们提供的生产者、消费者,比如说Connect的Sink/Source或Streams等;后我们需要监控服务器之间的交互行为。


1.主机监控

个人认为对于主机的监控是重要的。因为很多线上环境问题首先表现出来的症状就是主机的某些性能出现了明显的问题。此时通常是运维人员首先发现了它们然后告诉我们这台机器有什么问题,对于Kafka主机监控通常是发现问题的步。这一页列出了常见的指标,包括CPU、内存、带宽等数据。需要注意的是CPU使用率的统计。可能大家听过这样的提法:我的Kafka Broker CPU使用率是400%,怎么回事?对于这样的问题,我们首先要搞清楚这个使用率是怎么观测出来的? 很多人拿top命令中的vss或rss字段来表征CPU使用率,但实际上它们并不是真正的CPU使用率——那只是所有CPU共同作用于Kafka进程所花的时间片的比例。举个例子,如果机器上有16个CPU,那么只要这些值没有超过或接近1600, 那么你的CPU使用率实际上是很低的。因此要正确理解这些命令中各个字段的含义。

这页PPT右边给出了一本书,如果大家想监控主机性能的话,我个人建议这本《SystemsPerformance》就足够了。非常权威的一本书,推荐大家读一下。


2.监控JVM

Kafka本身是一个普通的Java进程,所以任何适用于JVM监控的方法对于监控Kafka都是相通的。步就是要先了解Kafka应用。比方说了解Kafka broker JVM的GC频率和延时都是多少,每次GC后存活对象的大小是怎样的等。了解了这些信息我们才能明确后面调优的方向。当然,我们毕竟不是特别的JVM专家,因此也不必过多追求繁复的JVM监控与调优。只需要关注大的方面即可。另外,如果大家时间很有限但又想快速掌握JVM监控与调优,推荐阅读《Java Performance》。


3.Per-Broker监控

首先要确保Broker进程是启动状态?这听起来好像有点搞笑,但我的确遇到过这样的情况。比如当把Kafka部署在Docker上时就容易出现进程启动但服务没有成功启动的情形。正常启动下,一个Kafka服务器起来的时候,应该有两个端口,一个端口是9092常规端口,会建一个TCP链接。还有一个端口是给JMX监控用的。当然有多台broker的话,那么controller机器会为每台broker都维护一个TCP连接。在实际监控时可以有意识地验证这一点。


对于Broker的监控,我们主要是通过JMS指标来做的。用过Kafka的人知道,Kafka社区提供了特别多的JMS指标,其中很多指标用处不大。我这里列了一些比较重要的:首先是broker机器每秒出入的字节数,就是类似于我可以监控网卡的流量,一定要把这个指标监控起来,并实时与你的网卡带宽进行比较——如果发现该值非常接近于带宽的话,就证明broker负载过高,要么增加新的broker机器,要么把该broker上的负载均衡到其他机器上。


另外还有两个线程池空闲使用率小关注,好确保它们的值都不要低于30%,否则说明Broker已经非常的繁忙。 此时需要调整线程池线程数。


接下来是监控broker服务器的日志。日志中包含了非常丰富的信息。这里所说的日志不仅是broker服务器的日志,还包括Kafka controller的日志。我们需要经常性地查看日志中是否出现了OOM错误抑或是时刻关注日志中抛出的ERROR信息。


我们还需要监控一些关键后台线程的运行状态。个人认为有两个比较重要的线程需要监控:一个Log Cleaner线程——该线程是执行数据压实操作的,如果该线程出问题了,用户通常无法感知到,然后会发现所有compact策略的topic会越来越大直到占满所有磁盘空间;另一个线程就是副本拉取线程,即follower broker使用该线程实时从leader处拉取数据。如果该线程“挂掉”了,用户通常也是不知道的,但会发现follower不再拉取数据了。因此我们一定要定期地查看这两个线程的状态,如果发现它们意味终止,则去找日志中寻找对应的报错信息。


4.Clients监控

客户端监控这块,我这边会分为两个,分别讨论对生产者和消费者的监控。生产者往Kafka发消息,在监控之前我们至少要了解一下客户端机器与Broker端机器之间的RTT是多少。对于那种跨数据中心或者是异地的情况来说,RTT本来就很大,如果不做特殊的调优,是不可能有太高的TPS的。目前Kafka producer是双线程的设计机制,分为用户主线程和Sender线程,当这个Sender线程挂了的时候,前端用户是不感知的,但表现为producer发送消息失败,所以用户好监控一下这个Sender线程的状态。

还有就是监控PRODUCE请求的处理延时。一条消息从生产者端发送到Kafka broker进行处理,之后返回给producer的总时间。整个链路中各个环节的耗时好要做到心中有数。因为很多情况下,如果你要提升生产者的TPS,了解整个链路中的瓶颈后才能做到有的放矢。后面PPT中我会讨论如何拆解这条链路。


现在说说消费者。这里的消费者说的是新版本的消费者,也就是java consumer。

社区已经非常不推荐再继续使用老版本的消费者了。新版本的消费者也是双线程设计,后面有一个心跳线程,如果这个线程挂掉的话,前台线程是不知情的。所以,用户好定期监控该心跳线程的存活情况。心跳线程定期发心跳请求给Kafka服务器,告诉Kafka,这个消费者实例还活着,以避免coordinator错误地认为此实例已“死掉”从而开启rebalance。Kafka提供了很多的JMX指标可以用于监控消费者,重要的消费进度滞后监控,也就是所谓的consumerlag。


假设producer生产了100条消息,消费者读取了80条,那么lag就是20。显然落后的越少越好,这表明消费者非常及时,用户也可以用工具行命令来查lag,甚至写Java的API来查。与lag对应的还有一个lead指标,它表征的是消费者领先条消息的进度。比如早的消费位移是1,如果消费者当前消费的消息是10,那么lead就是9。对于lead而言越大越好,否则表明此消费者可能处于停顿状态或者消费的非常慢,本质上lead和lag是一回事,之所以列出来是因为lead指标是我开发的,也算打个广告吧。


除了以上这些,我们还需要监控消费者组的分区分配情况,避免出现某个实例被分配了过多的分区,导致负载严重不平衡的情况出现。一般来说,如果组内所有消费者订阅的是相同的主题,那么通常不会出现明显的分配倾斜。一旦各个实例订阅的主题不相同且每个主题分区数参差不齐时就极易发生这种不平衡的情况。Kafka目前提供了3种策略来帮助用户完成分区分配,新的策略是黏性分配策略,它能保证的公平,大家可以去试一下。


后就是要监控rebalance的时间——目前来看,组内超多实例的rebalance性能很差,可能都是小时级别的。而且比较悲剧的是当前无较好的解决方案。所以,如果你的Consumer特别特别多的话,一定会有这个问题,你监控一下两个步骤所用的时间,看看是否满足需求,如果不能满足的话,看看能不能把消费者去除,尽量减少消费者数量。


5.Inter-Broker监控

后一个维度就是监控Broker之间的表现,主要是指副本拉取。Follower副本实时拉取leader处的数据,我们自然希望这个拉取过程越快越好。Kafka提供了一个特别重要的JMX指标,叫做备份不足的分区数,比如说我规定了这条消息,应该在三个Broker上面保存,假设只有一个或者两个Broker上保存该消息,那么这条消息所在的分区就被称为“备份不足”的分区。这种情况是特别关注的,因为有可能造成数据的丢失。《Kafka权威指南》一书中是这样说的:如果你只能监控一个Kafka JMX指标,那么就监控这个好了,确保在你的Kafka集群中该值是永远是0。一旦出现大于0的情形赶紧处理。

还有一个比较重要的指标是表征controller个数的。整个集群中应该确保只能有一台机器的指标是1,其他全应该是0,如果你发现有一台机器是2或者是3,一定是出现脑裂了,此时应该去检查下是否出现了网络分区。Kafka本身是不能对抗脑裂的,完全依靠Zookeeper来做,但是如果真正出现网络分区的话,也是没有办法处理的,不如赶快fail fast掉。


三、监控工具


当前没有一款Kafka监控工具是公认比较的,每个都有自己的特点但也有些致命的缺陷。我们针对一些常见的监控工具逐个讨论下。


1.Kafka Manager

应该说在所有免费的监控框架中,Kafka Manager是受欢迎的。它早由雅虎开源,功能非常齐全,展示的数据非常丰富。另外,用户能够在界面上执行一些简单的集群管理操作。更加令人欣慰的是,该框架目前还在不断维护中,因此使用Kafka manager来监控Kafka是一个不错的选择。


2.Burrow

Burrow是去年下半年开源,专门监控消费者信息的框架。这个框架刚开始开源的时候,我还对它还是寄予厚望的,毕竟是Kafka社区committer亲自编写的。不过Burrow的问题在于没有UI界面,不方便运维操作。另外由于是Go语言写的,你要用的话,必须搭建Go语言环境,然后编译部署,总之用起来不是很方便。还有就是它的更新不是很频繁,已经有点半荒废的状态,大家不妨一试。


3.Kafka Monitor

严格来说,它不是监控工具,它是专门做Kafka集群系统性测试用的。待监控的指标可以由用户自己设定,主要是做一些端到端的测试。比如说你搭了一套Kafka集群,想测试端到端的性能怎样:从发消息到消费者读取消息这一整体流程的性能。该框架的优势也是由Kafka社区团队写的,质量有保障,但更新不是很频繁,目前好像几个月没有更新了。


4.Kafka Offset Monitor

KafkaOffsetMonitor是我用的早的一个Kafka监控工具,也是监控消费者位移,只不过那时候Kafka把位移保持在Zookeepr上。这个框架的界面非常漂亮,国内用的人很多。但是现在有一个问题,因为我们现在用了新版本的消费者,这个框架目前支持得的并不是特别好。而且还有一个问题就是它已经不再维护了,可能有1-2年没有任何更新了。


5.Kafka Eagle

这是国人自己开发的,我不知道具体是哪个大牛开发的,但是在Kafka QQ群里面很多人推崇,因为界面很干净漂亮,上面有很好的数据展现。


6.Confluent Control Center

Control Center是目前我能收集到的功能齐全的Kafka监控框架了,只不过只有购买了Confluent企业版也有的,也就是说是付费的。


综合来讲,如果你是Kafka集群运维操作人员,推荐先用Kafka Manager来做监控,后面再根据实际监控需求定制化开发特有的工具或框架。


四、系统调优


Kafka监控的一个主要的目的就是调优Kafka集群。这里罗列了一些常见的操作系统级的调优。


首先是保证页缓存的大小——至少要设置页缓存为一个日志段的大小。我们知道Kafka大量使用页缓存,只要保证页缓存足够大,那么消费者读取消息时就有大概率保证它能够直接命中页缓存中的数据而无需从底层磁盘中读取。故只要保证页缓存要满足一个日志段的大小。


第二是调优文件打开数。很多人对这个资源有点畏手畏脚。实际上这是一个很廉价的资源,设置一个比较大的初始值通常都是没有什么问题的。


第三是调优vm.max_map_count参数。主要适用于Kafka broker上的主题数超多的情况。Kafka日志段的索引文件是用映射文件的机制来做的,故如果有超多日志段的话,这种索引文件数必然是很多的,极易打爆这个资源限制,所以对于这种情况一般要适当调大这个参数。


第四是swap的设置。很多文章说把这个值设为0,就是完全禁止swap,我个人不建议这样,因为当你设置成为0的时候,一旦你的内存耗尽了,Linux会自动开启OOM killer然后随机找一个进程杀掉。这并不是我们希望的处理结果。相反,我建议设置该值为一个比较接近零的较小值,这样当我的内存快要耗尽的时候会尝试开启一小部分swap,虽然会导致broker变得非常慢,但至少给了用户发现问题并处理之的机会。


第五JVM堆大小。首先鉴于目前Kafka新版本已经不支持Java7了,而Java 8本身不更新了,甚至Java9其实都不做了,直接做Java10了,所以我建议Kafka至少搭配Java8来搭建。至于堆的大小,个人认为6-10G足矣。如果出现了堆溢出,就提jira给社区,让他们看到底是怎样的问题。因为这种情况下即使用户调大heap size,也只是延缓OOM而已,不太可能从根本上解决问题。

后,建议使用专属的多块磁盘来搭建Kafka集群。自1.1版本起Kafka正式支持JBOD,因此没必要在底层再使用一套RAID了。


五、Kafka调优的四个层面


Kafka调优通常可以从4个维度展开,分别是吞吐量、延迟、持久性和可用性。在具体展开这些方面之前,我想先建议用户保证客户端与服务器端版本一致。如果版本不一致,就会出现向下转化的问题。举个例子,服务器端保存高版本的消息,当低版本消费者请求数据时,服务器端就要做转化,先把高版本消息转成低版本再发送给消费者。这件事情本身就非常非常低效。很多文章都讨论过Kafka速度快的原因,其中就谈到了零拷贝技术——即数据不需要在页缓存和堆缓存中来回拷贝。


简单来说producer把生产的消息放到页缓存上,如果两边版本一致,可以直接把此消息推给Consumer,或者Consumer直接拉取,这个过程是不需要把消息再放到堆缓存。但是你要做向下转化或者版本不一致的话,就要额外把数据再堆上,然后再放回到Consumer上,速度特别慢。


1.Kafka调优 – 吞吐量

调优吞吐量就是我们想用更短的时间做更多的事情。这里列出了客户端需要调整的参数。前面说过了producer是把消息放在缓存区,后端Sender线程从缓存区拿出来发到broker。这里面涉及到一个打包的过程,它是批处理的操作,不是一条一条发送的。因此这个包的大小就和TPS息息相关。通常情况下调大这个值都会让TPS提升,但是也不会无限制的增加。不过调高此值的劣处在于消息延迟的增加。除了调整batch.size,设置压缩也可以提升TPS,它能够减少网络传输IO。当前Lz4的压缩效果是好的,如果客户端机器CPU资源很充足那么建议开启压缩。

对于消费者端而言,调优TPS并没有太好的办法,能够想到的就是调整fetch.min.bytes。适当地增加该参数的值能够提升consumer端的TPS。对于Broker端而言,通常的瓶颈在于副本拉取消息时间过长,因此可以适当地增加num.replica.fetcher值,利用多个线程同时拉取数据,可以加快这一进程。


2.Kafka调优 – 延时

所谓的延时就是指消息被处理的时间。某些情况下我们自然是希望越快越好。针对这方面的调优,consumer端能做的不多,简单保持fetch.min.bytes默认值即可,这样可以保证consumer能够立即返回读取到的数据。讲到这里,可能有人会有这样的疑问:TPS和延时不是一回事吗?假设发一条消息延时是2ms,TPS自然就是500了,因为一秒只能发500消息,其实这两者关系并不是简单的。因为我发一条消息2毫秒,但是如果把消息缓存起来统一发,TPS会提升很多。假设发一条消息依然是2ms,但是我先等8毫秒,在这8毫秒之内可能能收集到一万条消息,然后我再发。相当于你在10毫秒内发了一万条消息,大家可以算一下TPS是多少。事实上,Kafka producer在设计上就是这样的实现原理。


3.Kafka调优 –消息持久性

消息持久化本质上就是消息不丢失。Kafka对消息不丢失的承诺是有条件的。以前碰到很多人说我给Kafka发消息,发送失败,消息丢失了,怎么办?严格来说Kafka不认为这种情况属于消息丢失,因为此时消息没有放到Kafka里面。Kafka只对已经提交的消息做有条件的不丢失保障。


如果要调优持久性,对于producer而言,首先要设置重试以防止因为网络出现瞬时抖动造成消息发送失败。一旦开启了重试,还需要防止乱序的问题。比如说我发送消息1与2,消息2发送成功,消息1发送失败重试,这样消息1就在消息2之后进入Kafka,也就是造成乱序了。如果用户不允许出现这样的情况,那么还需要显式地设置max.in.flight.requests.per.connection为1。

本页PPT列出的其他参数都是很常规的参数,比如unclean.leader.election.enable参数,好还是将其设置成false,即不允许“脏”副本被选举为leader。


4.Kafka调优 –可用性

后是可用性,与刚才的持久性是相反的,我允许消息丢失,只要保证系统高可用性即可。因此我需要把consumer心跳超时设置为一个比较小的值,如果给定时间内消费者没有处理完消息,该实例可能就被踢出消费者组。我想要其他消费者更快地知道这个决定,因此调小这个参数的值。


六、定位性能瓶颈


下面就是性能瓶颈,严格来说这不是调优,这是解决性能问题。对于生产者来说,如果要定位发送消息的瓶颈很慢,我们需要拆解发送过程中的各个步骤。就像这张图表示的那样,消息的发送共有6步。步就是生产者把消息放到Broker,第二、三步就是Broker把消息拿到之后,写到本地磁盘上,第四步是follower broker从Leader拉取消息,第五步是创建response;第六步是发送回去,告诉我已经处理完了。

这六步当中你需要确定瓶颈在哪?怎么确定?——通过不同的JMX指标。比如说步骤1是慢的,可能你经常碰到超时,你如果在日志里面经常碰到request timeout,就表示1是很慢的,此时要适当增加超时的时间。如果2、3慢的情况下,则可能体现在磁盘IO非常高,导致往磁盘上写数据非常慢。倘若是步骤4慢的话,查看名为remote-time的JMX指标,此时可以增加fetcher线程的数量。如果5慢的话,表现为response在队列导致待的时间过长,这时可以增加网络线程池的大小。6与1是一样的,如果你发现1、6经常出问题的话,查一下你的网络。所以,就这样来分解整个的耗时。这是到底哪一步的瓶颈在哪,需要看看什么样的指标,做怎样的调优。


七、Java Consumer的调优


后说一下Consumer的调优。目前消费者有两种使用方式,一种是同一个线程里面就直接处理,另一种是我采用单独的线程,consumer线程只是做获取消息,消息真正的处理逻辑放到单独的线程池中做。这两种方式有不同的使用场景:种方法实现较简单,因为你的消息处理逻辑直接写在一个线程里面就可以了,但是它的缺陷在于TPS可能不会很高,特别是当你的客户端的机器非常强的时候,你用单线程处理的时候是很慢的,因为你没有充分利用线程上的CPU资源。第二种方法的优势是能够充分利用底层服务器的硬件资源,TPS可以做的很高,但是处理提交位移将会很难。


后说一下参数,也是网上问的多的,这几个参数到底是做什么的。个参数,就是控制consumer单次处理消息的大时间。比如说设定的是600s,那么consumer给你10分钟来处理。如果10分钟内consumer无法处理完成,那么coordinator就会认为此consumer已死,从而开启rebalance。


Coordinator是用来管理消费者组的协调者,协调者如何在有效的时间内,把消费者实例挂掉的消息传递给其他消费者,就靠心跳请求,因此可以设置heartbeat.interval.ms为一个较小的值,比如5s。


八、Q & A


Q1:胡老师在前面提到低版本与高版本有一个端口的问题,我想问一下高版本的、低版本的会有这个问题吗?

A1:会有。


Q2:两种模式,一个是Consumer怎么做到所有的partition,在里面做管理的。会有一个问题,某个Consumer的消费比较慢,因为所有的Partition的消费都是绑定在一个线程。一个消费比较慢,一个消费比较快,要等另一个。有没有一种方案,消费者比较慢的可以暂定,如果涉及到暂停的话,频繁的暂定耗费的时间,是不是会比较慢?

A2:一个线程处理所有的分区。如果从开销来讲并不大,但是的确会出现像你说的,如果一个消费者定了100个分区,目前我这边看到的效果,某段时间内有可能会造成某些分区的饿死,比如说某些分区长期得不到数据,可能有一些分区不停的有数据,这种情况下的确有可能情况。但是你说的两种方法本身开销不是很大,因为它就是内存当中的结构变更,就是定位信息,如果segment,就把定位信息先暂时关掉,不涉及到很复杂的数据结构的变更。

Q3:怎么决定顺序呢?

A3:这个事情现在在Broker端做的,简单会做轮询,比如说有100个分区,批随机给你一批分区,之后这些分区会排到整个队列的末尾,从其他的分区开始给你,做到尽量的公平。


Q4:消费的时候会出现数据倾斜的情况,这块如何理解?

A4:数据倾斜。这种情况下发生在每个消费者订阅信息不一样的情况下,特别容易出现数据倾斜。比如说我订阅主题123,我订阅主题456,我们又在同一个组里面这些主题分区数极不相同,很有可能出现我订阅了10个分区,你可能订阅2个分区。如果你用的是有粘性的分配策略,那种保证不会出现超过两个以上相差的情况。这个策略推出的时间也不算短了,是0.11版本推出来的。


点击这里,免费申请DataPipeline产品试用

相关文章