春云Kafka StreamsUncaughtExceptionHandler
我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包括到我的服务中,但我的异常从未被它捕获/处理。
配置Bean:
@Autowired
UnCaughtExceptionHandler exceptionHandler;
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
自定义异常处理程序:
@Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Autowired
private StreamBridge streamBridge;
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
流处理函数:
@Autowired
private MyService service;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
Optional<Output> outputResult = service.process(value);
if (outputResult.isPresent()) {
result.set(new KeyValue<>(key, outputResult.get()));
return true;
}
return false;
})
.map((messageKey, messageValue) -> result.get());
}
我希望UnCaughtExceptionHandler处理由service.process()方法引发的任何异常。但是异常永远不会进入Handle方法;相反,它们传播到根并杀死客户端。我也看过this solution,但我想以更独立的方式处理它。
问题:如何使用StreamsUncaughtExceptionHandler处理任何处理异常?
- Spring Boot版本:2.6.3
- 春云溪流版本:3.2.1
- Spring-Cloud-Stream-Binder-Kafka-Streams:3.2.1
- Kafka-Streams:3.0.0
可复制示例:spring-cloud-kafka-streams-exception
解决方案
以下是您可以尝试的几种方法。
尝试在
StreamsBuilderFactoryBean
中的this line处设置断点,并查看配置的值是什么。这应该会给出一些线索。我注意到您在配置的Impl中为订单设置了
Integer.MAX_VALUE
。默认情况下,StreamsBuilderFactoryBean
使用阶段值Integer.MAX_VALUE - 1000
,因此在工厂Bean准备启动时,配置器可能还不可用,因为Integer.MAX_VALUE
的优先级较低。您可以将订单更改为类似Integer.MAX_VALUE - 5000
的内容,以确保在启动工厂Bean之前完全实例化配置Bean。
相关文章