Apache Camel:收到未知相关 ID 的回复
在其他两个软件组件之间有一个middleware
.在 middleware
中,我通过 Apache Camel
路由 Apache ActiveMQ
消息.
There is a middleware
in between of two other software components. In the middleware
I'm routing Apache ActiveMQ
messages by Apache Camel
.
这就是它的工作原理:
1stComponent
使用middleware
向3rdComponent
发送消息3rdComponent
回复消息并将其发送回1st
(使用middleware
).
1stComponent
usesmiddleware
to send message to the3rdComponent
3rdComponent
replies the message and sends it back to the1st
(usingmiddleware
).
1stComponent <<=>> Middleware <<=>> 3rdComponent
问题:
我在中间件中使用 ConcurrentConsumers
.
在顺序发送大量消息的过程中,突然middleware
停止了所有进程!没有例外或消息!例如,500 条消息中的前 100 条已得到处理,其余消息作为待处理消息保留在队列中.
In the middle of sending a lot of messages sequentially, suddenly middleware
stops all the process!
there is no exceptions or messages!
for example, first 100 of 500 messages got processed and the remainders remain in the queue as pending messages.
有时会在进程中间记录此警告:
this warning is logged sometimes in the middle of the process:
[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }
这是中间件
代码:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
.inOut("activemq2:queue:Q.3RD")
;
}
}
和3rdComponent
:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
from("activemq:queue:Q.3RD")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
;
}
}
推荐答案
更新:
我之前的回答工作正常,但不是完整的解决方案.
My previous answer is working correctly, but was not the complete solution.
错误是产生非唯一 CorrelationIDs
!(随机字符串生成器中的一个错误)简单!!!:|
The mistake was producing non-unique CorrelationIDs
! (a bug in random string generators) simple!!! :|
相关文章