春云Kafka StreamsUncaughtExceptionHandler

我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包括到我的服务中,但我的异常从未被它捕获/处理。

配置Bean:

@Autowired
UnCaughtExceptionHandler exceptionHandler;

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
    return new StreamsBuilderFactoryBeanConfigurer() {

        @Override
        public void configure(StreamsBuilderFactoryBean factoryBean) {;
            factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
        }

        @Override
        public int getOrder() {
            return Integer.MAX_VALUE;
        }

    };
}

自定义异常处理程序:

    @Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

  @Autowired
  private StreamBridge streamBridge;

  @Override
  public StreamThreadExceptionResponse handle(Throwable exception) {
    return StreamThreadExceptionResponse.REPLACE_THREAD;
  }
}

流处理函数:

@Autowired
private MyService service;

@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
    final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
    return kStream -> kStream
            .filter((key, value) -> value != null)
            .filter((key, value) -> {
                Optional<Output> outputResult = service.process(value);
                if (outputResult.isPresent()) {
                    result.set(new KeyValue<>(key, outputResult.get()));
                    return true;
                }
                return false;
            })
        .map((messageKey, messageValue) -> result.get());
}

我希望UnCaughtExceptionHandler处理由service.process()方法引发的任何异常。但是异常永远不会进入Handle方法;相反,它们传播到根并杀死客户端。我也看过this solution,但我想以更独立的方式处理它。

问题:如何使用StreamsUncaughtExceptionHandler处理任何处理异常?

  • Spring Boot版本:2.6.3
  • 春云溪流版本:3.2.1
  • Spring-Cloud-Stream-Binder-Kafka-Streams:3.2.1
  • Kafka-Streams:3.0.0

可复制示例:spring-cloud-kafka-streams-exception


解决方案

以下是您可以尝试的几种方法。

  1. 尝试在StreamsBuilderFactoryBean中的this line处设置断点,并查看配置的值是什么。这应该会给出一些线索。

  2. 我注意到您在配置的Impl中为订单设置了Integer.MAX_VALUE。默认情况下,StreamsBuilderFactoryBean使用阶段值Integer.MAX_VALUE - 1000,因此在工厂Bean准备启动时,配置器可能还不可用,因为Integer.MAX_VALUE的优先级较低。您可以将订单更改为类似Integer.MAX_VALUE - 5000的内容,以确保在启动工厂Bean之前完全实例化配置Bean。

从这些选项开始,查看它们是否为该问题提供了任何迹象。如果它仍然存在,请随时与我们分享一个可重复使用的小示例应用程序。

相关文章