Apache Camel:收到未知相关 ID 的回复

2022-01-19 00:00:00 concurrency java apache-camel activemq

在其他两个软件组件之间有一个middleware.在 middleware 中,我通过 Apache Camel 路由 Apache ActiveMQ 消息.

There is a middleware in between of two other software components. In the middleware I'm routing Apache ActiveMQ messages by Apache Camel.

这就是它的工作原理:

  1. 1stComponent 使用middleware3rdComponent
  2. 发送消息
  3. 3rdComponent 回复消息并将其发送回1st(使用middleware).

  1. 1stComponent uses middleware to send message to the 3rdComponent
  2. 3rdComponent replies the message and sends it back to the 1st(using middleware).

           1stComponent <<=>> Middleware <<=>> 3rdComponent

问题:

我在中间件中使用 ConcurrentConsumers.

在顺序发送大量消息的过程中,突然middleware停止了所有进程!没有例外或消息!例如,500 条消息中的前 100 条已得到处理,其余消息作为待处理消息保留在队列中.

In the middle of sending a lot of messages sequentially, suddenly middleware stops all the process! there is no exceptions or messages! for example, first 100 of 500 messages got processed and the remainders remain in the queue as pending messages.

有时会在进程中间记录此警告:

this warning is logged sometimes in the middle of the process:

[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }

这是中间件代码:

private static class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10")
        .threads(1, 100)
            .process(new Processor() {
                public void process(Exchange exchange) {
                    //some code
                }
            })
        .inOut("activemq2:queue:Q.3RD")
        ;
    }
}

3rdComponent:

private static class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() {
        from("activemq:queue:Q.3RD")
        .threads(1, 100)
        .process(new Processor() {
            public void process(Exchange exchange) {
                //some code
            }
        })
        ;
    }
}

推荐答案

更新:

我之前的回答工作正常,但不是完整的解决方案.

My previous answer is working correctly, but was not the complete solution.

错误是产生非唯一 CorrelationIDs!(随机字符串生成器中的一个错误)简单!!!:|

The mistake was producing non-unique CorrelationIDs! (a bug in random string generators) simple!!! :|

相关文章