从0开始学Kafka(上)

2020-05-28 00:00:00 数据 集群 消息 节点 选举

引言

近冒着变秃的危险学习kafka,看起我的小视频,记起我的小笔记~
在此之前,只是听说过kafka,并没有系统的学习过。如果是小白,可以进来和我一起摸索;如果是大佬,欢迎复习并纠正我的错误~
有关分布式和消息队列共性的一点理论这里就不展示了,主要是看kafka相关的特性和知识。

文章导读

  • Kafka基础概述(设计目标,架构,概念)
  • Kafka的高可用机制(数据备份,ISR,Commit,fail-over容错机制)
  • Kafka的Leader Election

一、Kafka基础概述

Kafka是一种高性能跨语言的分布式发布/订阅消息系统。

1.1 Kafka的设计目标

高吞吐率
配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。Kafka在大数据领域比较有用武之地,学习Flink的时候必然离不开Kafka。这么牛的性能其实也依赖于顺序读写零拷贝充分利用页缓存批处理和压缩等。这些我在后续都会分析。

消息持久化
所有消息均被持久化到磁盘,无消息丢失,支持消息重放。

完全分布式
支持消息分区以及分布式消费,并保证分区内的消息顺序,还支持Producer,Broker,Consumer的水平扩展(这些概念后续会写,这里先混个眼熟)。

数据处理
同时满足适应在线流处理和离线批处理。

1.2 Kafka的架构

可以看出,Kafka的几个角色,其中Broker就是Kafka的Server,在这幅图中暂且不研究Broker里面的细节,从全局角度,能得到下面几个结论:

  1. Producer将消息Push给Broker,而Consumer是以主动Pull的方式获取消息,而不是Broker主动Push给Consumer。
  2. Producer并没有通过Zookeeper来获取集群的信息。
    注:0.8.2.1之后以这种架构方式。
  3. Consumer依赖于Zookeeper来感知集群信息。
  4. Broker与Broker之间的通信依赖于Zookeeper。

下面主要根据这几个结论解答几个小疑问。

Q1:为什么要设计成Consumer主动Pull消息的模式?

主要是两个,Broker不需要感知到哪些Consumer存在;如果是以Push的方式就可能导致应用程序处理不过来延迟甚至数据丢失

Q2:Producer如何感知Broker集群的存在的?

0.8版本后,Producer不需要依赖Zookeeper获取Broker集群的元信息。通过IP+Port的方式,只要能连上一个Kafka,就能获得整个集群的信息。

其中,Producer获取Broker集群元信息的方法有:

  • Producer发送消息失败后,主动触发刷新集群元信息;
  • 周期性地刷新缓存元信息(可配置)。

Q3:Producer将消息分发给Broker的策略?

通过指定topic和partition来发送消息。

1.3 Kafka的基本概念

Topic

图片来源于网络
  1. 逻辑概念,同一个Topic的消息可以分布在多个Broker(节点)上。
  2. 一个Topic包含一个或者多个Partition。
  3. 每条消息仅属于一个Topic。
  4. Producer发布数据时,必须指定将该消息发布到哪一个Topic。
  5. Consumer订阅消息时,也必须指定订阅哪个Topic的消息。
    注:为了便于理解,这里将replica-factor设置成1。

Partition

一个Topic的结构

图中描述了一个写的而过程,可以看出,数据是以Segment为单位存储在Partition中的,每次都以追加的方式插入。

  1. 物理概念,一个Partition只分布于一个Broker上(不考虑备份)。
    注:这很重要,也是Broker id能代替Leader id的原因。
  2. 一个Partition物理上对应一个文件夹
  3. 一个Partition包含多个Segment,一个Segment对应一个文件。Segment由一个个不可变记录组成。Segment对用户是透明的,用户并不需要感知Segment的存在。
  4. 记录只会被append到Segment中,不会被单独删除或者修改。删除的话是删除整个Segment。
  5. 清除过期日志时,直接删除一个或多个Segment。

Broker
就是一个Kafka节点。可以类比为一个Broker就是一个微信号,微信号里的微信群就像是一个个Topic,Partition就是微信群中的人。都由Broker来做同一的管理。由Broker来接收Consumer和Producer的请求,并把消息持久化到本地磁盘。

Offset
消息在Partition中的编号,编号顺序不跨Partition。

Replication
是消息的备份。Kafka是以Partition为单位对消息进行冗余备份的,某个Replica就是基于某个Partition的备份,每个Partition至少有一个备份。

Leader
每个Replication集合中的Partition都会选出一个的Leader,所有的读写请求都由leader处理。其他Replica从Leader处把请求数据更新同步到本地。

Producer
按消息真正发送的时机来划分,有两种Producer,Sync Producer和Aync Producer。

  • Sync Producer(同步)
    低延迟,低吞吐率,无数据丢失。
    发送消息失败后,进行重试,几次重试失败可以将消息放到磁盘。
  • Aync Producer(异步)
    高延迟,高吞吐率,有数据丢失。
    调用后将消息放入队列,后台线程从队列中获取消息,批量发送给对应的broker
    如果队列满了并且阻塞超过了一定的时间,kafka会将新的数据直接丢掉。

二、Kafka的高可用机制

2.1 数据备份

首先来看一下Kafka是如何通过Replica做数据备份的?

  1. 当某个Topic的replication-factor为N且N>1时,每个Partition都会有N个副本。
  2. Replica的个数必须小于等于Broker数,否则会报错。即对每个Partition而言,每个Broker上多只会有一个Replic,因此可用Broker ID表示某个Replica。
  3. 所有Partition的所有Replica默认情况会均匀分配到所有Broker上。

可以通过一张图来直观地感受数据备份的过程:

大概过程是:
数据先写入某个Leader,然后(由Follower周期性的发起复制)再复制到各个Follower中去。读取时,从Leader处读取。

不过,光有上面的过程还不够,需要一种机制去解决Follower和Leader的延迟问题,以及Leader宕机后对Follower的选举策略,那么,Kafka是如何保证数据终的一致性的?

2.2 Commit机制

为了保障数据一致性,就需要Commit机制来打辅助。
Kafka规定,只有Commit过的数据,Consumer才能读取。这个步骤就相当于Leader告诉Producer客户端写成功了,数据被Commit了。只要被Commit的数据就说明有备份了

对于Commit的数据,如果使用同步,则不会有消息丢失,一致性高,但会使可用性较差;如果使用异步,则可用性较高但消息丢失概率增大。

于是乎,Kafka采用了折衷的办法,使用ISR机制

ISR(In-Sync Replica)机制

这里先解释一下几个概念,分区中的所有副本统称为AR(Assigned Repllicas)。ISR是AR的子集。三句话解释ISR:

  • 每一个Leader都会维护一个与其基本保持同步的Replica列表,称为ISR列表。
  • 如果一个Follower比一个Leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其从ISR中移除
  • 当ISR中所有Replica都向Leader发送ACK时,Leader就Commit

Commit的具体策略

Commit的具体策略是可配置的,具体可同过Server,Topic,Producer3种角度如下配置:

Server

  • replica.lag.time.max.ms=10000
  • replica.lag.max.messages=4000

解释:超过10秒或滞后4000条数据时,将此Replica移除ISR。

Topic

min.insync.replicas=1

解释:ISR中的元素少为1个。

Producer

request.required.acks=0

解释:默认为0时,异步,不需要Leader等待ACK就返回。为1时,同步,Leader必须等待ACK返回。为-1时,才会触发ISR机制 。

2.3 Kafka的fail-over的过程

图片来源于网络

fail-over就是一种常见的容错机制之一,意为失效转移。见上图,典型的就是Leader宕机,大概过程是

  1. m3在Leader中,还未Commit。
  2. Leader宕机,此时选举B为Leader(具体选举过程下一章会讲,默认应该是ISR中个Replica)。
  3. 存活的B,C继续接收消息同步m4,m5。
  4. A回归集群,执行它宕机时的ISR。
  5. A执行B中维护的ISR,保持终的一致性。

此时,问题来了,丢失的m3怎么办?
这个消息只能靠Producer Retry。超过固定次数失败后,就真的丢失了。

此时萌生一个恐怖的想法,如果Replica全部宕机,会怎么样?
对于Kafka来说,有两种策略:

  • 等待ISR中任意Replica恢复,并选举它为Leader。
    缺点:等待时间较长,降低可用性。如果ISR中所有Replica都无法恢复或者数据丢失,则该Partition将永不可用。
    优点:保证数据的完整性
  • 选择个恢复的Replica为新的Leader,无论它是否在ISR中。
    缺点:并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失
    优点:可用性高

注:默认配置是选个恢复的Replica为新的Leader。

三、Kafka的Leader Election

我们知道,Kafka中每一种Partition都由Replica构成,可以理解为Replica=Leader+Follower

所以Leader的选举有一个很容易象到的办法,就是以Partition为一个维度,每个Partition的多个Replica同时竞争Leader。意思是由Replica竞争,终选一个Leader,但是一个Partition中的那么多Replica都参与选举,势必会加重Zookeeper的负载。导致每一次的选举“粒度”都很高,延迟也较大。所以应该想一个延迟低,效率高,负载均衡,实时的更好的方法。

下面要说的,也是Kafka中的选举策略,基于Controller的Leader Election

按我的理解来说,解决的思想其实就是“整合”。整个集群中选举出一个Broker作为Controller Leader。把Leader Election的实现细节都交给这个Controller Leader完成,让它为所有Topic的所有Partition指定Leader及Follower,并且当ISR列表发生变化时,Controller通知集群中所有Broker更新对应的缓存。Controller与Leader和Follower之间借助Zookeeper,通过RPC通信。

选举的过程:

前提:Broker启动时,都会在Zookeeper路径下创建临时节点(/controller节点),只有个成功创建节点的Controller才是真正的Controller,并且在节点中写入当前Broker的信息。包括version,brokerid,timestamp。

当Controller故障后,利用Zookeeper的强一致性,所有的Broker都会监听该节点的变化,并且收到通知,再次竞争在该路径下创建节点,从而选举新的controller。

(1)Controller在Zookeeper中注册watch,一旦有Broker宕机。Broker在Zookeeper上的znode就会被自动删除。Zookeeper会fire Controller注册的watch,意思就是watch在fire之后会被取消,不会再关注该节点的变化。Controller读取新的幸存Broker的信息。

(2)Controller整理一个Set集合,这个集合包含了宕机的所有Broker上的所有Partition(也就是Partition上所有的Replica)。

(3)对Set中的每一个Partition,读取其当前的ISR。决定该Partition的新Leader。如果ISR中有一个Replica还幸存,就选择其中一个作为新Leader,新的ISR包含当前所有幸存的Replica。如果Partition的所有Replica都宕机,则将新Leader的id(对应于源码中的controllerId)设置为-1。
此时具体的选举策略有两种,上一章说过,即所有Replica宕机的情况。

(4)将新的Leader,ISR和新的leader_epoch及controller_epoch(旧的+1)写入新的controller节点的state。
注:leader_epoch和controller_epoch是Zookeeper中的概念,利用这个值来确定是否为新的Leader或Controller,因为每次选举都会加1。

(5)直接通过RPC向Set相关的Broker发送LeaderAndISRRequest命令。这个过程其实就是告知他们新的Leader都是谁,要听从它的指挥。

学习Kafka可能需要一些分布式的基础,以后有机会写一些。下一篇写Kafka的Consumer以及高性能的实现原理。

如果感觉写的还OK不妨点个赞,收藏比点赞高n倍,太气人啦~

参考文章:kafka leader选举机制原理

相关文章