Kafka事务原理剖析

2022-11-23 00:00:00 消息 发送 事务 消费 提交

一、事务概览

提起事务,我们印象可能就是ACID,需要满足原子性、一致性、事务隔离级别等概念,那kafka的事务能做到什么程度呢?我们首先看一下如何使用事务

Producer端代码如下

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();

ProducerRecord<String, String> kafkaMsg1 = new ProducerRecord<>(TOPIC1, "msg val");
producer.send(kafkaMsg1);
ProducerRecord<String, String> kafkaMsg2 = new ProducerRecord<>(TOPIC2, "msg val");
producer.send(kafkaMsg2);

producer.commitTransaction();

Consumer端不需要做特殊处理,跟消费普通消息一样

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
    }
}

1.1、事务配置

那需要如何配置呢?

Producer

Consumer

transactional.id

事务ID,类型为String字符串,默认为空,客户端自定义,例如"order_bus"

isolation.level

事务隔离级别,默认为空,开启事务的话,需要将其设置为"read_committed"

enable.idempotence

消息幂等开关,true/false,默认为false,当配置了transactional.id,此项一定要设置为true,否则会抛出客户端配置异常

  

transaction.timeout.ms

事务超时时间,默认为10秒,长为15分钟

  

相关文章