Kafka事务原理剖析
一、事务概览
提起事务,我们印象可能就是ACID,需要满足原子性、一致性、事务隔离级别等概念,那kafka的事务能做到什么程度呢?我们首先看一下如何使用事务
Producer端代码如下
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
ProducerRecord<String, String> kafkaMsg1 = new ProducerRecord<>(TOPIC1, "msg val");
producer.send(kafkaMsg1);
ProducerRecord<String, String> kafkaMsg2 = new ProducerRecord<>(TOPIC2, "msg val");
producer.send(kafkaMsg2);
producer.commitTransaction();
Consumer端不需要做特殊处理,跟消费普通消息一样
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
}
1.1、事务配置
那需要如何配置呢?
Producer | Consumer | ||
| 事务ID,类型为String字符串,默认为空,客户端自定义,例如"order_bus" |
| 事务隔离级别,默认为空,开启事务的话,需要将其设置为"read_committed" |
| 消息幂等开关,true/false,默认为false,当配置了transactional.id,此项一定要设置为true,否则会抛出客户端配置异常 | ||
| 事务超时时间,默认为10秒,长为15分钟 |
相关文章