未使用 Spring AMQP StatefulRetryOperationsInterceptor
我正在尝试将 spring amqp 配置为仅重试消息定义的次数.当前一条消息失败,例如因为 DataIntegrityViolationException
被无限期重新交付.
I am trying configure spring amqp to only retry a message a defined amount of times. Currently a message that fails e.g. because of a DataIntegrityViolationException
is redelivered indefinitely.
根据文档 这里我想出了以下配置
According to the documentation here I came up with the following configuration
@Bean
public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateful()
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.maxAttempts(3)
.messageKeyGenerator(message -> UUID.randomUUID().toString())
.build();
}
这似乎没有应用 - 消息仍在无限期地尝试.
This does not seem to be applied - the messages are still tried indefinitely.
感觉我在这里错过了什么.
Feels like I am missing something here.
这是我关于 AMQP 的剩余配置:
Here is my remaining configuration regarding AMQP:
@Bean
Queue testEventSubscriberQueue() {
final boolean durable = true;
return new Queue("testEventSubscriberQueue", durable);
}
@Bean
Binding binding(TopicExchange topicExchange) {
return BindingBuilder.bind(testEventSubscriberQueue()).to(topicExchange).with("payload.event-create");
}
@Bean
SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(testEventSubscriberQueue().getName());
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(MessageConverter messageConverter, SubscriberHandler subscriberHandler) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(subscriberHandler);
listenerAdapter.setMessageConverter(messageConverter);
return listenerAdapter;
}
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
final Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
jsonMessageConverter.setJsonObjectMapper(objectMapper);
DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
defaultClassMapper.setDefaultType(EventPayload.class);
jsonMessageConverter.setClassMapper(defaultClassMapper);
final ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(jsonMessageConverter);
messageConverter.addDelgate(MessageProperties.CONTENT_TYPE_JSON, jsonMessageConverter);
return messageConverter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
//rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public TopicExchange testExchange() {
final boolean durable = true;
final boolean autoDelete = false;
return new TopicExchange(EXCHANGE_NAME, durable, autoDelete);
}
我正在使用 spring-amqp 1.5.1.RELEASE.
I am using spring-amqp 1.5.1.RELEASE.
感谢任何帮助.
推荐答案
你需要配置容器将拦截器添加到它的通知链中...
You need to configure the container to add the interceptor to its advice chain...
container.setAdviceChain(new Advice[] { statefulRetryOperationsInterceptor() });
相关文章