Kafka系列2:深入理解Kafka生产者
Kafka系列2:深入理解Kafka消费者
上篇聊了Kafka概况,包含了Kafka的基本概念、设计原理,以及设计核心。本篇单独聊聊Kafka的生产者,包括如下内容:
- 生产者是如何生产消息
- 如何创建生产者
- 发送消息到Kafka
- 生产者配置
- 分区
生产者是如何生产消息的
首先来看一下Kafka生产者组件图
这里大家可以关注一下我的个人专栏《Java 进阶集中营》,每天会给大家即时分享一个新的java技术资讯,有的java技术内容,也欢迎分享在我的专栏。
JAVA 进阶集中营步,Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。第二步,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。
如何创建生产者
属性设置
在创建生产者对象的时候,要设置一些属性,有三个属性是必选的:
- bootstrap.servers:指定Broker的地址清单,地址格式为host:port。清单里不需要包含所有的Broker地址,生产者会从给定的Broker里查找到其他Broker的信息;不过建议至少要提供两个Broker的信息保证容错。
- key.serializer:指定键的序列化器。Broker希望接收到的消息的键和值都是字节数组。这个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer,因此一般不需要实现自定义的序列化器。需要注意的是,key.serializer属性是必须设置的,即使只发送值内容。
- value.serializer:指定值的序列化器。如果键和值都是字符串,可以使用与key.serializer一样的序列化器,否则需要使用不同的序列化器。
项目依赖
以maven项目为例,要使用Kafka客户端,需要引入kafka-clients依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
相关文章