插曲:Kafka的生产者原理及重要参数说明

2020-05-26 00:00:00 数据 集群 消息 发送 设置

推荐阅读:

一只Tom猫:手撕分布式技术:限流、通讯、缓存,全部一锅端走送给你!zhuanlan.zhihu.com一只Tom猫:消息队列面试,你能顶得住面试官这波10大连环炮的攻势吗?zhuanlan.zhihu.com

一、Kafka的Producer原理

首先我们得先有个集群吧,然后集群中有若干台服务器,每个服务器我们管它叫Broker,其实就是一个个Kafka进程

如果大家还记得[篇]的内容,就不难猜出来,接下来肯定会有一个controller和多个follower,还有个zookeeper集群,一开始我们的Broker都会注册到我们的zookeeper集群上面。

然后controller也会监听zookeeper集群的变化,在集群产生变化时更改自己的元数据信息。并且follower也会去它们的老大controller那里去同步元数据信息,所以一个Kafka集群中所有服务器上的元数据信息都是一致的。

上述准备完成后,我们正式开始我们生产者的内容

① 名词1 --- ProducerRecord

生产者需要往集群发送消息前,要先把每一条消息封装成ProducerRecord对象,这是生产者内部完成的。之后会经历一个序列化的过程。之前好几篇专栏也是有提到过了,需要经过网络传输的数据都是二进制的一些字节数据,需要进行序列化才能传输。

此时就会有一个问题,我们需要把消息发送到一个Topic下的一个leader partition中,可是生产者是怎样get到这个topic下哪个分区才是leader partition呢?

可能有些小伙伴忘了,提醒一下,controller可以视作为broker的领导,负责管理集群的元数据,而leader partition是做负载均衡用的,它们会分布式地存储在不同的服务器上面。集群中生产数据也好,消费数据也好,都是针对leader partition而操作的。

② 名词2 --- partitioner

怎么知道哪个才是leader partition,只需要获取到元数据不就好了嘛。

说来要怎么获取元数据也不难,只要随便找到集群下某一台服务器就可以了(因为集群中的每一台服务器元数据都是一样的)

③ 名词3 --- 缓冲区

此时生产者不着急把消息发送出去,而是先放到一个缓冲区

④ 名词4 --- Sender

把消息放进缓冲区之后,与此同时会有一个独立线程Sender去把消息分批次包装成一个个Batch,不难想到如果Kafka真的是一条消息一条消息地传输,一条消息就是一个网络连接,那性能就会被拉得很差。为了提升吞吐量,所以采取了分批次的做法

整好一个个batch之后,就开始发送给对应的主机上面。此时经过篇所提到的Kakfa的网络设计中的模型,然后再写到os cache,再写到磁盘上面。

下图是当时我们已经说明过的Kafka网络设计模型

⑤ 生产者代码

1.设置参数部分

// 创建配置文件对象
Properties props = new Properties();

// 这个参数目的是为了获取kafka集群的元数据
// 写一台主机也行,多个更加保险
// 这里使用的是主机名,要根据server.properties来决定
// 使用主机名的情况需要配置电脑的hosts文件(重点)
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");  

// 这个就是负责把发送的key从字符串序列化为字节数组
// 我们可以给每个消息设置key,作用之后再阐述
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 这个就是负责把你发送的实际的message从字符串序列化为字节数组
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 以下属于调优,之后再解释
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);

相关文章