Kafka client 消息接收的三种模式
Kafka client 消息接收的三种模式
引言
kafka的消费模式总共有3种:最多一次,最少一次,正好一次。为什么会有这3种模式,是因为客户端处理消息,提交反馈(commit)这两个动作不是原子性。
- 1.最多一次:客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。
- 2.最少一次:客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。
- 3.正好一次:保证消息处理和提交反馈在同一个事务中,即有原子性。
本文从这几个点出发,详细阐述了如何实现以上三种方式。
1.At-most-once(最多一次)
- 设置
enable.auto.commit
为ture
- 设置
- 设置
auto.commit.interval.ms
为一个较小的时间间隔.
- 设置
- client不要调用
commitSync()
,kafka在特定的时间间隔内自动提交。
- client不要调用
- 示例
public void mostOnce(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
}
2.At-least-once(最少一次)
方法一
- 设置
enable.auto.commit
为false
- 设置
- client调用
commitSync()
,增加消息偏移;
- client调用
public void leastOnce(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-1");
props.put("enable.auto.commit", "false"); //取消自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
process(record);
consumer.commitAsync(); //提交offset
}
}
方法二
- 设置
enable.auto.commit
为ture
- 设置
- 设置
auto.commit.interval.ms
为一个较大的时间间隔.
- 设置
- client调用
commitSync()
,增加消息偏移;
- client调用
- 示例
public void leastOnce(){
Properties props = new Properties();
props.put("bootstrap.servers", "10.242.1.219:9092");
props.put("group.id", "test-1");
props.put("enable.auto.commit", "true"); //自动提交
props.put("auto.commit.interval.ms", "99999999");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
process(record);
consumer.commitAsync(); //提交offset
}
}
3.Exactly-once(正好一次)
3.1 思路
- 如果要实现这种方式,必须自己控制消息的offset,自己记录一下当前的offset,对消息的处理和offset的移动必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到mysql数据库同时更新此时的消息的偏移。
3.2 实现
- 设置
enable.auto.commit
为false
- 设置
- 保存
ConsumerRecord
中的offset到数据库
- 保存
- 当partition分区发生变化的时候需要rebalance,有以下几个事件会触发分区变化
- 1 consumer订阅的topic中的分区大小发生变化
- 2 topic被创建或者被删除
- 3 consuer所在group中有个成员挂了
- 4 新的consumer通过调用join加入了group
此时 consumer通过实现ConsumerRebalanceListener
接口,捕捉这些事件,对偏移量进行处理。
- consumer通过调用
seek(TopicPartition, long)
方法,移动到指定的分区的偏移位置。
- consumer通过调用
3.3 实验
- 首先需要建立存储topic,partition中的offset记录,建表如下
DROP TABLE IF EXISTS `tb_yx_message`;
CREATE TABLE `tb_yx_message` (
`id` bigInt(20) NOT NULL AUTO_INCREMENT COMMENT '主键' ,
`topic` varchar(128) NOT NULL DEFAULT '' COMMENT '主题' ,
`kPartition` varchar(128) NOT NULL DEFAULT '0' COMMENT '分区' ,
`offset` bigInt(20) NOT NULL DEFAULT '' COMMENT '偏移' ,
PRIMARY KEY (`id`),
UNIQUE KEY `UNIQ_KEY`(topic,kPartition)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分区消息表';#约10万
- 2.同时创建操作数据库的的MessageDao类
public interface MessageDao {
@Insert(" insert into tb_yx_message(topic,kPartition,offset) " +
" values(#{topic},#{kPartition},#{offset})")
public int insertWinner(MessageOffsetPO MessageOffset);
//获取偏移
@Select(" select * from tb_yx_message " +
" where topic=#{topic} and kPartition=#{kPartition} ")
public MessageOffsetPO get(@Param("topic") String topic,
@Param("kPartition") String partition);
//更新偏移
@Update(" update tb_yx_message set offset=#{offset}" +
" where topic=#{topic} and kPartition=#{kPartition}")
public int update(@Param("offset") long offset,
@Param("topic") String topic,
@Param("kPartition") String partition);
}
3.接下去需要实现
ConsumerRebalanceListener
接口,在分区rebalance的时候,调用的顺序:先调用onPartitionsRevoked
(通知consumer 任务被取消了),再调用onPartitionsAssigned
(通知consumer新的任务来了)。4.那么在我们收到任务被取消的时候,把对应offset保存到数据库;在收到新任务到来的时候,从数据库读出对应分区的偏移(例如刚启动),具体实现如下所示。
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
private Consumer<String, String> consumer;
private MessageDao messageDao;
public MyConsumerRebalancerListener(Consumer<String, String> consumer, MessageDao messageDao) {
this.consumer = consumer;
this.messageDao = messageDao;
}
//任务被取消
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
long offset = consumer.position(partition);
MessageOffsetPO build = MessageOffsetPO
.builder()
.offset(offset)
.kPartition(partition.partition() + "")
.topic(partition.topic())
.build();
try {
messageDao.insertWinner(build);
} catch (Exception e) {
}
log.info("onPartitionsRevoked topic:{},build:{}",partition.topic(),build);
}
}
//收到新任务
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
MessageOffsetPO messageOffsetPO = messageDao.get(partition.topic(), partition.partition() + "");
if(messageOffsetPO==null){ //接收到新的topic,即数据库中记录不存在
consumer.seek(partition,0);
}else{
consumer.seek(partition,messageOffsetPO.getOffset()+1);//下一个offset,所以需要加1
}
log.info("onPartitionsAssigned topic:{},messageOffsetPO:{},offset:{}",partition,messageOffsetPO);
}
}
}
- 对于client端的代码可以这么写
/** * 正好一次 */
public void exactlyOnce(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-1");
props.put("enable.auto.commit", "false"); //取消自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
MyConsumerRebalancerListener rebalancerListener = new MyConsumerRebalancerListener(consumer,messageDao);
consumer.subscribe(Arrays.asList("test-new-topic-1", "new-topic"),rebalancerListener);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
boolean isException=false;
try {
log.info("consume record:{}",record);
processService.process(record);
} catch (Exception e) {
e.printStackTrace();
isException=true;
}
if(isException){
//处理发生异常,说明数据没有被消费,为了保证能被消费,需要移动到该位置继续进行处理
TopicPartition topicPartition=new TopicPartition(record.topic(),record.partition());
consumer.seek(topicPartition,record.offset());
log.info("consume exception offset:{}",record.offset());
break;
}
// rebalancerListener.getOffsetMananger().saveOffsetInExternalStore(record.topic(),record.kPartition(),record.offset());
}
}
}
- 第19行 **processService.process(record);**对消息进行消费,同时记录了消息的偏移位置,
ProcessService
代码如下
@Service
@Slf4j
public class ProcessService {
@Autowired
MessageDao messageDao;
@Transactional(rollbackFor = Exception.class)
public void process(ConsumerRecord<String, String> record){
log.info(" record:{}",record);
//对消息进行处理,这里只是简单的打印了一下
System.out.println(">>>>>>>>>"+Thread.currentThread().getName()+"_"+record);
//更新偏移量
messageDao.update(record.offset(),record.topic(),record.partition()+"");
}
}
3.4 运行程序.
- 场景1: consumer订阅topic——
test-new-topic-1
(之前未订阅过),然后producer向consumer发送两条信息,consumer收到信息,未抛出异常。得到日志如下:
2018-01-11 14:36:07,948 main INFO MyConsumerRebalancerListener.onPartitionsAssigned:53 -onPartitionsAssigned topic:test-new-topic-1-0,messageOffsetPO:null,offset:{ }
2018-01-11 14:37:15,156 main INFO KafkaConsumeTest.exactlyOnce:106 -consume record:ConsumerRecord(topic = test-new-topic-1, partition = 0, offset = 0, CreateTime = 1515652635056, serialized key size = 1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = message:0-test)
2018-01-11 14:37:15,183 main INFO ProcessService.process:23 - record:ConsumerRecord(topic = test-new-topic-1, partition = 0, offset = 0, CreateTime = 1515652635056, serialized key size = 1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = message:0-test)
2018-01-11 14:37:15,238 main INFO KafkaConsumeTest.exactlyOnce:106 -consume record:ConsumerRecord(topic = test-new-topic-1, partition = 0, offset = 1, CreateTime = 1515652635062, serialized key size = 1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = message:1-test)
2018-01-11 14:37:15,239 main INFO ProcessService.process:23 - record:ConsumerRecord(topic = test-new-topic-1, partition = 0, offset = 1, CreateTime = 1515652635062, serialized key size = 1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = message:1-test)
- 第1行,consumer收到新的任务指派,由于数据库中没有这个topic,所以获得的messgeOffsetPO为null,那么
consumer需要从offset为0处开始接受数据
。 - 第2-3行,consumer获得了数据并对数据进行处理,消息的offset=0
- 第4-5行,consumer获得了数据并对数据进行处理,消息的offset=1
- 此时数据库的状态为:
–
- 场景2:producer向consumer发送两条信息,consumer收到信息,进行处理,但抛出异常。
@Transactional(rollbackFor = Exception.class)
public void process(ConsumerRecord<String, String> record){
log.info(" record:{}",record);
System.out.println(">>>>>>>>>"+Thread.currentThread().getName()+"_"+record);
messageDao.update(record.offset(),record.topic(),record.partition()+"");
throw new RuntimeException("error");//抛出异常
}
- 运行后得到如下结果:
2018-01-11 14:56:48,357 main INFO KafkaConsumeTest.exactlyOnce:116 -consume exception offset:2
2018-01-11 14:56:48,866 main INFO KafkaConsumeTest.exactlyOnce:106 -consume record:ConsumerRecord(topic = test-new-topic-1, partition = 0, offset = 2, CreateTime = 1515653807138, serialized key size = 1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = message:0-test)
2018-01-11 14:56:48,867 main INFO ProcessService.process:23 - record:ConsumerRecord(topic = test-new-topic-1, partition = 0, offset = 2, CreateTime = 1515653807138, serialized key size = 1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = message:0-test)
.......
可以看到,consumer一直在消费offset为2的数据,由于处理时时异常状态,所以一直在消费2,数据库此时的状态与之前的一致,offset为1,符合预期。
4.引用:
kafkaClinet官方文档
kafkaProducer官方文档
老外的博客 他是用文件读写的方式实现“正好一次”,感觉不是很好,不过对我启发比较大。
招聘
阿里巴巴内推
原文作者:乱在长安
原文地址: https://blog.csdn.net/laojiaqi/article/details/79034798
本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
原文地址: https://blog.csdn.net/laojiaqi/article/details/79034798
本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
相关文章