如何将@Transaction与@KafkaListener一起使用?

有没有可能将声明性TX管理(通过@Transaction)与@KafkaListener注释方法一起使用? 例如,我想使用它来为每个监听器定义单独的发送超时。 我的设置如下:

TransactionManager:

@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
  return new ChainedKafkaTransactionManager<>(
    kafkaTransactionManager, 
    hibernateTransactionManager);
}

KafkaListener:

@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}

问题是-KafkaMessageListenerContainer在调用此类方法之前创建自己的事务-它使用自己的TransactionTemplate:

@Nullable
private TransactionTemplate determineTransactionTemplate() {
  return this.transactionManager != null
    ? new TransactionTemplate(this.transactionManager)
    : null;
}

未使用TransactionInterceptor。那么具体的@KafkaListener方法如何设置具体的TX超时时间呢?


解决方案

可以这样做,但有点复杂,因为您必须将消耗的偏移量发送到Kafka交易。

不使用ChainedKafkaTransactionManager,您可以为容器使用KafkaTransactionManager,为HibernateTransactionManager使用@Transactional

这将产生类似的结果,因为Hibernate Tx将在Kafka事务之前提交(如果Hibernate提交失败,则Kafka Tx将回滚)。

编辑

若要将不同的链式TM配置到每个侦听器容器中,可以执行以下操作。

@组件 类ContainerFactoryCustomizer{

ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
        ChainedKafkaTransactionManager<?, ?> chainedOne,
        ChainedKafkaTransactionManager<?, ?> chainedTwo) {
    factory.setContainerCustomizer(
            container -> {
                String groupId = container.getContainerProperties().getGroupId();
                if (groupId.equals("foo")) {
                    container.getContainerProperties().setTransactionManager(chainedOne);
                }
                else {
                    container.getContainerProperties().setTransactionManager(chainedTwo);
                }
            });
}

}


Where each chained TM has a Hibernate TM with a different default timeout.

The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.

相关文章