Kafka 笔记 02: 事务
Kafka 过去一直保障到 At Least Once 语义,用户生产的消息不会丢失,但是可能发生重复。比如,broker 接收 Producer 消息后崩溃,但是没来得及向 Producer 返回 ack,而使 Producer 重试而导致消息重复。为了避免 Producer 到 Broker 端的消息重复,Kafka 引进了 Idempotent Producer 特性,使每条消息携带 Producer ID 和递增的 sequence 实现去重,从而能够实现生产侧的 Exactly Once。
但是仅靠生产消息的 Exactly Once 语义仍不能满足 Kafka Stream 这种流式处理场景的需要。在流式处理场景中,大部分操作都属于从一个或者多个 Topic Partition 消费数据,经处理后写入另外的多个 Topic Partition,即 Consume-Transform-Produce 过程。这一过程涉及多轮读写操作,而系统随时可能崩溃,崩溃重试时不希望看到消息产生重复消费而导致处理结果不确定,会更希望有事务原子性,做到要么全成功,要么全没发生。
简而言之,Kafka 事务能保证向多个 Topic Partition 的原子写入。
至于 Consumer Group 的消费操作,在 Kafka 也是基于向内部 __consumer_offsets 这个 Topic 写入点位实现的 commit。因此,事务中的消费操作也可以视作向多个 Topic Partition 的原子写入之一。
事务设计
按《Transactional Messaging in Kafka》中的介绍,在 Kafka 事务的实现方面存在一些设计约束:
- 要使同一 Topic 做到同时支持事务性消息和普通的非事务性消息,并不影响非事务性消息的性能;
- 要使事务性消息的性能尽可能接近普通非事务性消息的性能;
- 尽量不增加客户端的复杂性;
事务的实现大致上是一个两阶段提交过程,一是增加 Transaction ID,二是增加事务控制消息,三是增加一个 Transaction Coordinator 组件。
事务控制消息
Kafka 事务在设计上要求同一 Topic 中既包含事务性消息,也包含非事务性消息。相比非事务性消息,事务性消息需要携带额外的事务信息,此外,还需要通过事务控制消息(Control Message)用于在两阶段提交中标记事务的 Commit 或者 Abort。
Transaction ID 与 Epoch
事务处理应用需要在遭遇重启时,能够偶从中断的地方进行事务恢复,将未完成的事务回滚或者继续完成。为此需要一个跨 session 的 ID 标识,即 Transaction ID。该 ID 由用户在 producer 的参数中指定,是个类似 Consumer Group ID 的字符串。
每个 Transaction ID 也对应着一个递增的 Epoch 号,用于防御 Zombie Writer 的写入,确保同一 Transaction ID 在同一时刻只允许一个 Producer 执行。
Transaction Coordinator
Transaction Coordinator 会将事务状态日志记录到 __transaction_control 这个内部 topic 内,这单与 Group Coordinator 有点类似,两者均依赖 Kafka 本身的日志持久性。Transaction Coordinator 的日志中只有事务的状态变化,不包含任何事务消息的数据内容,单纯从写入方面似乎不大容易成为瓶颈。
__transaction_control 这一 Topic 内部默认有 50 个 Partition,会根据 Transaction ID 作为分区 Key,并将目标分区 Leader 所在的 Broker 作为 Transaction Coordinator。
LSO(Last Stable Offset)
未提交的事务会落在 LSO 之后,类似 HW,如果 Consumer 配置了 Read Committed 隔离级别,那么只有当事务提交即收到 Commit 控制消息之后,才能读到事务内的消息。如果事务被 Abort,则忽略该事务相关的所有消息。
事务接口
事务的接口大致上长这样:
kafkaProducer.initTransactions()
kafkaProducer.beginTransaction()
ConsumerRecords<String, String> messages = consumer.poll(100)
try {
messages.forEach { message ->
val order = objectMapper.readValue<Order>(message.value())
if (order.meal == "Pizza") {
kafkaProducer.send(ProducerRecord(pizzaIncomeIncreased, order.value))
}
}
val consumedOffsets = getConsumedOffsets(kafkaConsumer)
kafkaProducer.sendOffsetsToTransaction(consumedOffsets, "kafka-transactions-group")
kafkaProducer.commitTransaction()
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
kafkaProducer.abortTransaction()
}
相关文章