从0开始学Kafka(上)
引言
近冒着变秃的危险学习kafka,看起我的小视频,记起我的小笔记~
在此之前,只是听说过kafka,并没有系统的学习过。如果是小白,可以进来和我一起摸索;如果是大佬,欢迎复习并纠正我的错误~
有关分布式和消息队列共性的一点理论这里就不展示了,主要是看kafka相关的特性和知识。
文章导读
- Kafka基础概述(设计目标,架构,概念)
- Kafka的高可用机制(数据备份,ISR,Commit,fail-over容错机制)
- Kafka的Leader Election
一、Kafka基础概述
Kafka是一种高性能跨语言的分布式发布/订阅消息系统。
1.1 Kafka的设计目标
高吞吐率
配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。Kafka在大数据领域比较有用武之地,学习Flink的时候必然离不开Kafka。这么牛的性能其实也依赖于顺序读写,零拷贝,充分利用页缓存,批处理和压缩等。这些我在后续都会分析。
消息持久化
所有消息均被持久化到磁盘,无消息丢失,支持消息重放。
完全分布式
支持消息分区以及分布式消费,并保证分区内的消息顺序,还支持Producer,Broker,Consumer的水平扩展(这些概念后续会写,这里先混个眼熟)。
数据处理
同时满足适应在线流处理和离线批处理。
1.2 Kafka的架构
可以看出,Kafka的几个角色,其中Broker就是Kafka的Server,在这幅图中暂且不研究Broker里面的细节,从全局角度,能得到下面几个结论:
- Producer将消息Push给Broker,而Consumer是以主动Pull的方式获取消息,而不是Broker主动Push给Consumer。
-
Producer并没有通过Zookeeper来获取集群的信息。
注:0.8.2.1之后以这种架构方式。 - Consumer依赖于Zookeeper来感知集群信息。
- Broker与Broker之间的通信依赖于Zookeeper。
下面主要根据这几个结论解答几个小疑问。
Q1:为什么要设计成Consumer主动Pull消息的模式?
主要是两个,Broker不需要感知到哪些Consumer存在;如果是以Push的方式就可能导致应用程序处理不过来,延迟甚至数据丢失。
Q2:Producer如何感知Broker集群的存在的?
0.8版本后,Producer不需要依赖Zookeeper获取Broker集群的元信息。通过IP+Port的方式,只要能连上一个Kafka,就能获得整个集群的信息。
其中,Producer获取Broker集群元信息的方法有:
- Producer发送消息失败后,主动触发刷新集群元信息;
- 周期性地刷新缓存元信息(可配置)。
Q3:Producer将消息分发给Broker的策略?
通过指定topic和partition来发送消息。
1.3 Kafka的基本概念
Topic
- 逻辑概念,同一个Topic的消息可以分布在多个Broker(节点)上。
- 一个Topic包含一个或者多个Partition。
- 每条消息仅属于一个Topic。
- Producer发布数据时,必须指定将该消息发布到哪一个Topic。
- Consumer订阅消息时,也必须指定订阅哪个Topic的消息。
注:为了便于理解,这里将replica-factor设置成1。
Partition
图中描述了一个写的而过程,可以看出,数据是以Segment为单位存储在Partition中的,每次都以追加的方式插入。
- 物理概念,一个Partition只分布于一个Broker上(不考虑备份)。
注:这很重要,也是Broker id能代替Leader id的原因。 - 一个Partition物理上对应一个文件夹。
- 一个Partition包含多个Segment,一个Segment对应一个文件。Segment由一个个不可变记录组成。Segment对用户是透明的,用户并不需要感知Segment的存在。
- 记录只会被append到Segment中,不会被单独删除或者修改。删除的话是删除整个Segment。
- 清除过期日志时,直接删除一个或多个Segment。
Broker
就是一个Kafka节点。可以类比为一个Broker就是一个微信号,微信号里的微信群就像是一个个Topic,Partition就是微信群中的人。都由Broker来做同一的管理。由Broker来接收Consumer和Producer的请求,并把消息持久化到本地磁盘。
Offset
消息在Partition中的编号,编号顺序不跨Partition。
Replication
是消息的备份。Kafka是以Partition为单位对消息进行冗余备份的,某个Replica就是基于某个Partition的备份,每个Partition至少有一个备份。
Leader
每个Replication集合中的Partition都会选出一个的Leader,所有的读写请求都由leader处理。其他Replica从Leader处把请求数据更新同步到本地。
Producer
按消息真正发送的时机来划分,有两种Producer,Sync Producer和Aync Producer。
- Sync Producer(同步)
低延迟,低吞吐率,无数据丢失。
发送消息失败后,进行重试,几次重试失败可以将消息放到磁盘。 - Aync Producer(异步)
高延迟,高吞吐率,有数据丢失。
调用后将消息放入队列,后台线程从队列中获取消息,批量发送给对应的broker
如果队列满了并且阻塞超过了一定的时间,kafka会将新的数据直接丢掉。
二、Kafka的高可用机制
2.1 数据备份
首先来看一下Kafka是如何通过Replica做数据备份的?
- 当某个Topic的replication-factor为N且N>1时,每个Partition都会有N个副本。
- Replica的个数必须小于等于Broker数,否则会报错。即对每个Partition而言,每个Broker上多只会有一个Replic,因此可用Broker ID表示某个Replica。
- 所有Partition的所有Replica默认情况会均匀分配到所有Broker上。
可以通过一张图来直观地感受数据备份的过程:
大概过程是:
数据先写入某个Leader,然后(由Follower周期性的发起复制)再复制到各个Follower中去。读取时,从Leader处读取。
不过,光有上面的过程还不够,需要一种机制去解决Follower和Leader的延迟问题,以及Leader宕机后对Follower的选举策略,那么,Kafka是如何保证数据终的一致性的?
2.2 Commit机制
为了保障数据一致性,就需要Commit机制来打辅助。
Kafka规定,只有Commit过的数据,Consumer才能读取。这个步骤就相当于Leader告诉Producer客户端写成功了,数据被Commit了。只要被Commit的数据就说明有备份了。
对于Commit的数据,如果使用同步,则不会有消息丢失,一致性高,但会使可用性较差;如果使用异步,则可用性较高但消息丢失概率增大。
于是乎,Kafka采用了折衷的办法,使用ISR机制。
ISR(In-Sync Replica)机制
这里先解释一下几个概念,分区中的所有副本统称为AR(Assigned Repllicas)。ISR是AR的子集。三句话解释ISR:
- 每一个Leader都会维护一个与其基本保持同步的Replica列表,称为ISR列表。
- 如果一个Follower比一个Leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其从ISR中移除。
- 当ISR中所有Replica都向Leader发送ACK时,Leader就Commit。
Commit的具体策略
Commit的具体策略是可配置的,具体可同过Server,Topic,Producer3种角度如下配置:
Server
- replica.lag.time.max.ms=10000
- replica.lag.max.messages=4000
解释:超过10秒或滞后4000条数据时,将此Replica移除ISR。
Topic
min.insync.replicas=1
解释:ISR中的元素少为1个。
Producer
request.required.acks=0
解释:默认为0时,异步,不需要Leader等待ACK就返回。为1时,同步,Leader必须等待ACK返回。为-1时,才会触发ISR机制 。
2.3 Kafka的fail-over的过程
fail-over就是一种常见的容错机制之一,意为失效转移。见上图,典型的就是Leader宕机,大概过程是:
- m3在Leader中,还未Commit。
- Leader宕机,此时选举B为Leader(具体选举过程下一章会讲,默认应该是ISR中个Replica)。
- 存活的B,C继续接收消息同步m4,m5。
- A回归集群,执行它宕机时的ISR。
- A执行B中维护的ISR,保持终的一致性。
此时,问题来了,丢失的m3怎么办?
这个消息只能靠Producer Retry。超过固定次数失败后,就真的丢失了。
此时萌生一个恐怖的想法,如果Replica全部宕机,会怎么样?
对于Kafka来说,有两种策略:
- 等待ISR中任意Replica恢复,并选举它为Leader。
缺点:等待时间较长,降低可用性。如果ISR中所有Replica都无法恢复或者数据丢失,则该Partition将永不可用。
优点:保证数据的完整性。 - 选择个恢复的Replica为新的Leader,无论它是否在ISR中。
缺点:并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失。
优点:可用性高。
注:默认配置是选个恢复的Replica为新的Leader。
三、Kafka的Leader Election
我们知道,Kafka中每一种Partition都由Replica构成,可以理解为Replica=Leader+Follower。
所以Leader的选举有一个很容易象到的办法,就是以Partition为一个维度,每个Partition的多个Replica同时竞争Leader。意思是由Replica竞争,终选一个Leader,但是一个Partition中的那么多Replica都参与选举,势必会加重Zookeeper的负载。导致每一次的选举“粒度”都很高,延迟也较大。所以应该想一个延迟低,效率高,负载均衡,实时的更好的方法。
下面要说的,也是Kafka中的选举策略,基于Controller的Leader Election。
按我的理解来说,解决的思想其实就是“整合”。整个集群中选举出一个Broker作为Controller Leader。把Leader Election的实现细节都交给这个Controller Leader完成,让它为所有Topic的所有Partition指定Leader及Follower,并且当ISR列表发生变化时,Controller通知集群中所有Broker更新对应的缓存。Controller与Leader和Follower之间借助Zookeeper,通过RPC通信。
选举的过程:
前提:Broker启动时,都会在Zookeeper路径下创建临时节点(/controller节点),只有个成功创建节点的Controller才是真正的Controller,并且在节点中写入当前Broker的信息。包括version,brokerid,timestamp。
当Controller故障后,利用Zookeeper的强一致性,所有的Broker都会监听该节点的变化,并且收到通知,再次竞争在该路径下创建节点,从而选举新的controller。
(1)Controller在Zookeeper中注册watch,一旦有Broker宕机。Broker在Zookeeper上的znode就会被自动删除。Zookeeper会fire Controller注册的watch,意思就是watch在fire之后会被取消,不会再关注该节点的变化。Controller读取新的幸存Broker的信息。
(2)Controller整理一个Set集合,这个集合包含了宕机的所有Broker上的所有Partition(也就是Partition上所有的Replica)。
(3)对Set中的每一个Partition,读取其当前的ISR。决定该Partition的新Leader。如果ISR中有一个Replica还幸存,就选择其中一个作为新Leader,新的ISR包含当前所有幸存的Replica。如果Partition的所有Replica都宕机,则将新Leader的id(对应于源码中的controllerId)设置为-1。
此时具体的选举策略有两种,上一章说过,即所有Replica宕机的情况。
(4)将新的Leader,ISR和新的leader_epoch及controller_epoch(旧的+1)写入新的controller节点的state。
注:leader_epoch和controller_epoch是Zookeeper中的概念,利用这个值来确定是否为新的Leader或Controller,因为每次选举都会加1。
(5)直接通过RPC向Set相关的Broker发送LeaderAndISRRequest命令。这个过程其实就是告知他们新的Leader都是谁,要听从它的指挥。
后
学习Kafka可能需要一些分布式的基础,以后有机会写一些。下一篇写Kafka的Consumer以及高性能的实现原理。
如果感觉写的还OK不妨点个赞,收藏比点赞高n倍,太气人啦~
参考文章:kafka leader选举机制原理
相关文章