如何将@Transaction与@KafkaListener一起使用?
有没有可能将声明性TX管理(通过@Transaction)与@KafkaListener注释方法一起使用? 例如,我想使用它来为每个监听器定义单独的发送超时。 我的设置如下:
TransactionManager:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
return new ChainedKafkaTransactionManager<>(
kafkaTransactionManager,
hibernateTransactionManager);
}
KafkaListener:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
问题是-KafkaMessageListenerContainer在调用此类方法之前创建自己的事务-它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
return this.transactionManager != null
? new TransactionTemplate(this.transactionManager)
: null;
}
未使用TransactionInterceptor。那么具体的@KafkaListener方法如何设置具体的TX超时时间呢?
解决方案
可以这样做,但有点复杂,因为您必须将消耗的偏移量发送到Kafka交易。
不使用ChainedKafkaTransactionManager
,您可以为容器使用KafkaTransactionManager
,为HibernateTransactionManager
使用@Transactional
。
这将产生类似的结果,因为Hibernate Tx将在Kafka事务之前提交(如果Hibernate提交失败,则Kafka Tx将回滚)。
编辑
若要将不同的链式TM配置到每个侦听器容器中,可以执行以下操作。
@组件 类ContainerFactoryCustomizer{ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
ChainedKafkaTransactionManager<?, ?> chainedOne,
ChainedKafkaTransactionManager<?, ?> chainedTwo) {
factory.setContainerCustomizer(
container -> {
String groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("foo")) {
container.getContainerProperties().setTransactionManager(chainedOne);
}
else {
container.getContainerProperties().setTransactionManager(chainedTwo);
}
});
}
}
Where each chained TM has a Hibernate TM with a different default timeout.
The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.
相关文章