Kafka 如何做到 1 秒发布百万条消息

2020-05-28 00:00:00 数据 多个 消息 缓存 磁盘

Kafka 是分布式发布-订阅消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

现在被广泛地应用于构建实时数据管道和流应用的场景中,具有横向扩展,容错,快等优点,并已经运行在众多大中型公司的生产环境中,成功应用于大数据领域,本文分享一下我所了解的 Kafka。

Kafka 高吞吐率性能揭秘

Kafka 的个突出特定就是“快”,而且是那种变态的“快”,在普通廉价的虚拟机器上,比如一般 SAS 盘做的虚拟机上,据 LINDEDIN 统计,新的数据是每天利用 Kafka 处理的消息超过1万亿条,在峰值时每秒钟会发布超过百万条消息,就算是在内存和 CPU 都不高的情况下,Kafka 的速度高可以达到每秒十万条数据,并且还能持久化存储。

作为消息队列,要承接读跟写两块的功能,首先是写,就是消息日志写入 Kafka,那么,Kafka 在“写”上是怎么做到写变态快呢?

Kafka 让代码飞起来之写得快

首先,可以使用 Kafka 提供的生产端 API 发布消息到 1 个或多个 Topic(主题)的一个(保证数据的顺序)或者多个分区(并行处理,但不一定保证数据顺序)。Topic 可以简单理解成一个数据类别,是用来区分不同数据的。

Kafka 维护一个 Topic 中的分区 log,以顺序追加的方式向各个分区中写入消息,每个分区都是不可变的消息队列。分区中的消息都是以 k-v 形式存在。

  • k 表示 offset,称之为偏移量,一个 64 位整型的标识,offset 代表了 Topic 分区中所有消息流中该消息的起始字节位置。
  • v 就是实际的消息内容,每个分区中的每个 offset 都是存在的,所有分区的消息都是一次写入,在消息未过期之前都可以调整 offset 来实现多次读取。


以上提到 Kafka “快”的个因素:消息顺序写入磁盘。

我们知道现在的磁盘大多数都还是机械结构(SSD不在讨论的范围内),如果将消息以随机写的方式存入磁盘,就会按柱面、磁头、扇区的方式进行(寻址过程),缓慢的机械运动(相对内存)会消耗大量时间,导致磁盘的写入速度只能达到内存写入速度的几百万分之一,为了规避随机写带来的时间消耗,KAFKA采取顺序写的方式存储数据,如下图所示:






新来的消息只能追加到已有消息的末尾,并且已经生产的消息不支持随机删除以及随机访问,但是消费者可以通过重置 offset 的方式来访问已经消费过的数据。

即使顺序读写,过于频繁的大量小 I/O 操作一样会造成磁盘的瓶颈,所以 Kafka 在此处的处理是把这些消息集合在一起批量发送,这样减少对磁盘 IO 的过度读写,而不是一次发送单个消息。

另一个是率的字节复制,尤其是在负载比较高的情况下影响是显着的。为了避免这种情况,Kafka 采用由 Producer,broker 和 consumer 共享的标准化二进制消息格式,这样数据块就可以在它们之间自由传输,无需转换,降低了字节复制的成本开销。

同时,Kafka 采用了 MMAP(Memory Mapped Files,内存映射文件)技术。很多现代操作系统都大量使用主存做磁盘缓存,一个现代操作系统可以将内存中的所有剩余空间用作磁盘缓存,而当内存回收的时候几乎没有性能损失。

由于 Kafka 是基于 JVM 的,并且任何与 Java 内存使用打过交道的人都知道两件事:

▪ 对象的内存开销非常高,通常是实际要存储数据大小的两倍;

▪ 随着数据的增加,java的垃圾收集也会越来越频繁并且缓慢。

基于此,使用文件系统,同时依赖页面缓存就比使用其他数据结构和维护内存缓存更有吸引力:

▪ 不使用进程内缓存,就腾出了内存空间,可以用来存放页面缓存的空间几乎可以翻倍。

▪ 如果 Kafka 重启,进行内缓存就会丢失,但是使用操作系统的页面缓存依然可以继续使用。

可能有人会问 Kafka 如此频繁利用页面缓存,如果内存大小不够了怎么办?

Kafka 会将数据写入到持久化日志中而不是刷新到磁盘。实际上它只是转移到了内核的页面缓存。

利用文件系统并且依靠页缓存比维护一个内存缓存或者其他结构要好,它可以直接利用操作系统的页缓存来实现文件到物理内存的直接映射。完成映射之后对物理内存的操作在适当时候会被同步到硬盘上。

Kafka 让代码飞起来之读得快

Kafka 除了接收数据时写得快,另外一个特点就是推送数据时发得快。

Kafka 这种消息队列在生产端和消费端分别采取的 push 和 pull 的方式,也就是你生产端可以认为 Kafka 是个无底洞,有多少数据可以使劲往里面推送,消费端则是根据自己的消费能力,需要多少数据,你自己过来 Kafka 这里拉取,Kafka 能保证只要这里有数据,消费端需要多少,都尽可以自己过来拿。

▲零拷贝

具体到消息的落地保存,broker 维护的消息日志本身就是文件的目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。维护这个公共的格式并允许优化重要的操作:网络传输持久性日志块。 现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。

要理解 senfile 的影响,重要的是要了解将数据从文件传输到 socket 的公共数据路径,如下图所示,数据从磁盘传输到 socket 要经过以下几个步骤:

▪ 操作系统将数据从磁盘读入到内核空间的页缓存

▪ 应用程序将数据从内核空间读入到用户空间缓存中

▪ 应用程序将数据写回到内核空间到 socket 缓存中

▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出

这里有四次拷贝,两次系统调用,这是非常低效的做法。如果使用 sendfile,只需要一次拷贝就行:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有后一步将数据拷贝到网卡缓存中是需要的。

常规文件传输和 zeroCopy 方式的性能对比:





假设一个 Topic 有多个消费者的情况, 并使用上面的零拷贝优化,数据被复制到页缓存中一次,并在每个消费上重复使用,而不是存储在存储器中,也不在每次读取时复制到用户空间。这使得以接近网络连接限制的速度消费消息。

这种页缓存和 sendfile 组合,意味着 Kafka 集群的消费者大多数都完全从缓存消费消息,而磁盘没有任何读取活动。

▲批量压缩

在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络带宽,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。所以数据压缩就很重要。可以每个消息都压缩,但是压缩率相对很低。所以 Kafka 使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。

Kafka 允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。

Kafka 支持 Gzip 和 Snappy 压缩协议。

Kafka 数据可靠性深度解读






Kafka 的消息保存在 Topic 中,Topic 可分为多个分区,为保证数据的安全性,每个分区又有多个 Replia。

▪ 多分区的设计的特点:

  1. 为了并发读写,加快读写速度;
  2. 是利用多分区的存储,利于数据的均衡;
  3. 是为了加快数据的恢复速率,一但某台机器挂了,整个集群只需要恢复一部分数据,可加快故障恢复的时间。

每个 Partition 分为多个 Segment,每个 Segment 有 .log 和 .index 两个文件,每个 log 文件承载具体的数据,每条消息都有一个递增的 offset,Index 文件是对 log 文件的索引,Consumer 查找 offset 时使用的是二分法根据文件名去定位到哪个 Segment,然后解析 msg,匹配到对应的 offset 的 msg。

<Partition recovery过程>

每个 Partition 会在磁盘记录一个 RecoveryPoint,,记录已经 flush 到磁盘的大 offset。当 broker 失败重启时,会进行 loadLogs。首先会读取该 Partition 的 RecoveryPoint,找到包含 RecoveryPoint 的 segment 及以后的 segment, 这些 segment 就是可能没有完全 flush 到磁盘 segments。然后调用 segment 的 recover,重新读取各个 segment 的 msg,并重建索引。每次重启 Kafka 的 broker 时,都可以在输出的日志看到重建各个索引的过程。

< 数据同步>

Producer 和 Consumer 都只与 Leader 交互,每个 Follower 从 Leader 拉取数据进行同步。





如上图所示,ISR 是所有不落后的 replica 集合,不落后有两层含义:距离上次 FetchRequest 的时间不大于某一个值或落后的消息数不大于某一个值,Leader失 败后会从 ISR 中随机选取一个 Follower 做 Leader,该过程对用户是透明的。

当 Producer 向 Broker 发送数据时,可以通过 request.required.acks 参数设置数据可靠性的级别。

此配置是表明当一次 Producer 请求被认为完成时的确认值。特别是,多少个其他 brokers 必须已经提交了数据到它们的 log 并且向它们的 Leader 确认了这些信息。

▪典型的值:

0: 表示 Producer 从来不等待来自 broker 的确认信息。这个选择提供了小的时延但同时风险大(因为当server宕机时,数据将会丢失)。

1:表示获得 Leader replica 已经接收了数据的确认信息。这个选择时延较小同时确保了 server 确认接收成功。

-1:Producer 会获得所有同步 replicas 都收到数据的确认。同时时延大,然而,这种方式并没有完全消除丢失消息的风险,因为同步 replicas 的数量可能是 1。如果你想确保某些 replicas 接收到数据,那么你应该在 Topic-level 设置中选项 min.insync.replicas 设置一下。

仅设置 acks= -1 也不能保证数据不丢失,当 ISR 列表中只有 Leader 时,同样有可能造成数据丢失。要保证数据不丢除了设置 acks=-1,还要保证 ISR 的大小大于等于2。

▪具体参数设置:

request.required.acks:设置为 -1 等待所有 ISR 列表中的 Replica 接收到消息后采算写成功。

min.insync.replicas:设置为 >=2,保证 ISR 中至少两个 Replica。

Producer:要在吞吐率和数据可靠性之间做一个权衡。

Kafka 作为现代消息中间件中的佼佼者,以其速度和高可靠性赢得了广大市场和用户青睐,其中的很多设计理念都是非常值得我们学习的,本文所介绍的也只是冰山一角,希望能够对大家了解 Kafka 有一定的作用。

欢迎工作一到五年的Java工程师朋友们加入Java程序员开发: 854393687

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

相关文章