Camel 2.11 批量聚合如何与单独的路由一起使用?

2022-01-19 00:00:00 aggregation java apache-camel

首先有一个类似的未回答问题将路由加入单个聚合器

First there is a similar unanswered question Joining routes into single aggregator

我们有一些消费者路由(ftp、file、smb)从远程系统读取文件.简化了直接路由的测试,但与批处理消费者的行为相似:

We have some consumer routes (ftp, file, smb) reading files from remote systems. Simplified for test with direct route, but similar behavior with batch consumers:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

转换后,一次投票的所有结果将在单独的路由中按批次聚合:

After transformation all results from one poll are aggregated by batch in a separate route:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

如果每个消费者分开运行,一切正常.但如果多个消费者并行运行,聚合将拆分民意调查.例如,如果文件消费者轮询 500 条消息,并且第二条路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,1 个来自文件的 500 条消息,1 个来自 ftp 的 6 个消息.

All works fine, if every consumer runs separated. But if multiple consumers runs in parallel, aggregation will split the polls. Example if file-consumer polls 500 messages and a second route starts to read 6 files from ftp the expections is that we get 2 aggregates 1 with 500 messages from file and 1 with 6 messages from ftp.

测试用例:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

结果是:A+A"、B"、A"、B"、A",而不是预期的A+A+A"、B+B"、A",Z".问题:

The result is: "A+A", "B", "A", "B", "A" and not the expected "A+A+A", "B+B", "A", "Z". Questions:

  1. 我们对聚合的假设是否错误?
  2. 我们如何才能实现预期的行为?
  3. 如果我们设置了completionTimeout,它接缝会从第一次交换发生超时 - 如果还有新的交换,则独立?

推荐答案

你几乎可以正常工作了.这是您需要的更改(稍后我会解释).

You almost have it working. Here is the change you need (and after I will explain).

from("direct:aggregate").id("aggregate")
    .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
    .completionSize(property(Exchange.BATCH_SIZE))
    .to("log:result", "mock:result")

结果将是:

Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A

注意:您不会收到 "Z" 的结果,因为批量大小为 7.

Note: You won't receive a result for the "Z" since the batch size is 7.

解释一下 - 正如您所读到的,聚合器是一个多功能的骆驼组件,正确定义的关键是:

To explain - as you have read, the Aggregator is a versatile camel component and the key things to define correctly are:

  • 聚合表达式
  • 补全规则

现在,在您的情况下,您正在聚合一个属性 AGGREGATION_PROPERTY,它将是 ABZ.此外,您正在指定批量大小.

Now in your case you are aggregating on a property AGGREGATION_PROPERTY which will be A, B or Z. In addition you are specifying a batch size.

但是,您没有在路线中表达 completionSize().相反,您使用的是 completionFromBatchConsumer - 它做了一些不同的事情(代码声明它查找 Exchange#BATCH_COMPLETE 属性),因此结果很奇怪.

However you aren't expressing a completionSize() in your route. Instead you were using completionFromBatchConsumer - which does something different (the code states that it looks for a Exchange#BATCH_COMPLETE property), thus the weird results.

无论如何,.completionSize(Exchange.BATCH_SIZE) 将使您的测试按需要运行.

Anyway, .completionSize(Exchange.BATCH_SIZE) will make your test run as desired.

祝你好运.

相关文章