在 Spring Integration 中处理异常时遇到问题

我是 Spring 集成的新手,对如何将错误消息发送到指定的错误队列感到困惑.我希望错误消息成为原始消息的标题并最终出现在单独的队列中.我读到这可以通过标题丰富器来完成,我尝试实现它,但错误队列中没有显示任何内容.

I'm new to spring integration and am confused about how to send error messages to a designated error queue. I want the error message to be a header on the original message and end up in a separate queue. I read that this can be done with a header enricher, which I tried to implement but nothing is showing up in the error queue.

另外,我是否需要一个单独的异常处理类才能将错误消息放入错误队列,或者我可以在我的转换方法中抛出异常吗?

Also, do I need a separate exception handling class in order for the error messages to make it to the error queue or can I just throw exceptions in my transforming methods?

这是我的 xml 配置:

Here is my xml config:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
   xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:context="http://www.springframework.org/schema/context"
   xsi:schemaLocation="http://www.springframework.org/schema/beans    
                        http://www.springframework.org/schema/beans/spring-beans.xsd    
                        http://www.springframework.org/schema/integration    
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/integration/amqp
                        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

   <rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
   <rabbit:admin connection-factory="connectionFactory" />
   <rabbit:queue name="first" auto-delete="false" durable="true" />
   <rabbit:queue name="second" auto-delete="false" durable="true" />
   <rabbit:queue name="errorQueue" auto-delete="false" durable="true" />

   <int:poller default="true" fixed-rate="100"/>

   <rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
     <rabbit:bindings>
        <rabbit:binding queue="second" />
     </rabbit:bindings>
   </rabbit:fanout-exchange>

   <rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true">
      <rabbit:bindings>
         <rabbit:binding queue="errorQueue" />
      </rabbit:bindings>
   </rabbit:fanout-exchange>

  <int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange"  amqp-template="amqpTemplate" />

   <int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" />

   <int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange"  amqp-template="amqpTemplate" />

   <int:channel id="messageInputChannel" />
   <int:channel id="messageOutputChannel"/>
   <int:channel id="errorInputChannel"/>

<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" >
   <bean class="firstAttempt.MessageErrorHandler"/>

   <int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel">
     <int:header-enricher>
    <int:error-channel ref="errorInputChannel" />
       </int:header-enricher>
            <int:transformer method = "convert" >
                <bean class="firstAttempt.JsonObjectConverter" />
            </int:transformer>
        <int:service-activator method="transform">
             <bean class="firstAttempt.Transformer" />
        </int:service-activator>
     <int:object-to-string-transformer />
   </int:chain>

</beans>

错误类别:

public class ErrorHandler {
    public String errorHandle(MessageHandlingException exception) {
        return exception.getMessage();

QualityScorer 类(由转换器调用):

QualityScorer class (called by transformer):

public class QualityScorer {
    private Hashtable<String, String> table;
    private final static String csvFile = "C:\Users\john\Test.csv";

public QualityScorer() throws Exception {
    table = new Hashtable<String, String>();
    initializeTable();
}

private void initializeTable() throws Exception {
     BufferedReader br = null;
      String line = "";
    String cvsSplitBy = ",";
   try {
       br = new BufferedReader(new FileReader(csvFile));
        while ((line = br.readLine()) != null) {
               String[] data = line.split(cvsSplitBy);
            if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1"))
                table.putIfAbsent(data[3], data[1]);
        }
   } catch (FileNotFoundException e) {
       throw new Exception("No file found");
  } catch (IOException e) {
            e.printStackTrace();
  } finally {
        if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public float getScore(JSONObject object) throws Exception {
        float score;
        if (object == null) {
            throw new IllegalArgumentException("object");
        }
        if (!object.has("source")) {
            throw new Exception("Object does not have a source");
        }
         if (!object.has("employer")) {
            throw new Exception("Object does not have an employer");
        }
        String source = object.getString("Source");
        String employer = object.getString("employer");
            if (table.containsKey(employer) && !source.equals("packageOne")) {
               score = 1;
          } else {
                score = -1;
        }
        return score;
    }
}

现在,正在加载的消息没有来源,所以程序应该将 MessagingException 抛出给 MessageErrorHandler.

Right now, the message being loaded has no source, so the program should be throwing the MessagingException to the MessageErrorHandler.

变压器代码:

public class Transformer {
 private QualityScorer qualityScorer;

 public Transformer() throws Exception {
  qualityScorer = new QualityScorer();
 }

 public JSONObject transform(JSONObject object) throws Exception {

     float score = qualityScorer.getScore(object);
     object.put("score", score);
    return object;
    }
}

总的来说,程序应该从队列接收预加载的消息,对其进行转换并将其发送到第二个队列,如果在预加载的消息中提供了源,它就会成功.我正在尝试处理错误并将其作为消息头发送到错误队列.这个问题困扰了我一段时间,非常感谢您的帮助!

All together, the program should receive a pre-loaded message from a queue, transform it and send it on to a second queue, which it does successfully if the source is provided in the pre-loaded message. I'm trying to handle errors and make it so they are sent to an error queue as a message header. This issue has been frustrating me for awhile, so help is greatly appreciated!

stacktrace 中当前显示的错误是:

The error currently being shown in the stacktrace is:

java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.lang.Thread.run(Thread.java:748)

但是没有任何东西进入错误队列.

But nothing is going to the error queue.

推荐答案

当异常抛出时,与requestMessage一起包装到MessagingException.您自己的业务异常在 cause 中,您可以从 MessagingException.failedMessage 属性访问 requestMessage.

When the exception is thrown, it is wrapped together with the requestMessage to the MessagingException. Your own business exception is in the cause and you can get access to the requestMessage from the MessagingException.failedMessage property.

因此,您似乎拥有了用例所需的一切.只有在发送到 error-exchange 之前你真的应该在错误流中有一些 <transformer> 才能正确转换 MessagingException 的问题到正确的消息发送到 AMQP.

So, it looks like you have everything you need for your use-case. Only the problem that before sending to the error-exchange you really should have some <transformer> in the error flow to properly convert that MessagingException to the proper message to send to the AMQP.

相关文章