8.5. 聚合器

概述

通过 图 8.5 “聚合器模式” 中显示的 聚合器 模式,您可以将相关消息的批处理合并到单个消息中。

图 8.5. 聚合器模式

聚合器模式

要控制聚合器的行为,Apache Camel 允许您指定 Enterprise Integration Patterns 中描述的属性,如下所示:

  • correlation 表达式 criu- iwl 决定哪些消息应聚合在一起。每个传入消息上评估了关联表达式,以生成 关联密钥。然后,具有相同关联键的传入消息被分组到同一批处理中。例如,如果要将 所有传入 的消息聚合到一个消息中,您可以使用恒定表达式。
  • 当消息批处理完成后,completeness condition requiredness condition Determines.您可以将它指定为一个简单的大小限制,或者通常可以指定批处理完成后标记的 predicate 条件。
  • 聚合算法 criu-wagon 将单个关联密钥的消息交换组合到单个消息交换中。

例如,假设一个库存市场数据系统每秒接收 30,000 个消息。如果您的 GUI 工具无法应对此类大规模更新率,您可能希望减慢消息流。传入的库存引号可以通过选择最新的引号并丢弃较旧的价格而一起聚合。(如果您想要捕获一些历史记录,您可以应用 delta 处理算法。)

注意

聚合器现在使用包含更多信息的 ManagedAggregateProcessorMBean 在 JMX 中加入。它允许您使用聚合控制器来控制它。

聚合器的工作方式

图 8.6 “聚合器实施” 显示了聚合器的工作方式的概述,假设它是具有关联键(如 A、B、C 或 D)的交换流。

图 8.6. 聚合器实施

消息路由 02

图 8.6 “聚合器实施” 中显示的交换流按如下处理:

  1. correlator 负责根据关联密钥对交换进行排序。对于每个传入的交换,评估关联表达式,生成关联密钥。例如,对于 图 8.6 “聚合器实施” 中显示的交换,关联键评估为 A。
  2. 聚合策略 负责合并具有相同关联密钥的交换。当一个新交换时,会进入 A,聚合器会在 聚合存储库中查找对应的聚合交换 A,并将其与新交换合并。

    在特定的聚合周期完成前,传入的交换会与相应的聚合交换持续聚合。聚合周期持续到由其中一个完成机制终止为止。

    注意

    从 Camel 2.16,新的 XSLT Aggregation 策略允许您将两个消息与 XSLT 文件合并。您可以从 toolbox 访问 AggregationStrategies.xslt () 文件。

  3. 如果在聚合器上指定了 completion predicate,则会测试聚合交换,以确定它是否准备好发送到路由中的下一个处理器。处理继续如下:

    • 如果完成,则聚合交换由路由的后部分处理。有两种替代模型: 同步 (默认),这会导致调用线程阻止或 异步 (如果启用了并行处理),其中聚合交换将提交到 executor 线程池(如 图 8.6 “聚合器实施”所示)。
    • 如果没有完成,聚合交换会保存回聚合存储库。
  4. 与同步完成测试并行,可以通过启用 completionTimeout 选项或 completionInterval 选项来启用异步完成测试。这些完成测试在单独的线程中运行,并在满足完成测试时,对应的交换被标记为完成,并开始由路由的后部分处理(根据并行处理是启用并行处理还是异步处理)。
  5. 如果启用了并行处理,则线程池负责处理路由后者中的交换。默认情况下,这个线程池包含十个线程,但您可以选择自定义池(“线程选项”一节)。

Java DSL 示例

以下示例使用 UseLatestAggregationStrategy 聚合策略来聚合具有相同 StockSymbol 标头值的交换。对于给定 StockSymbol 值,如果收到该关联键的最后三秒以上,则聚合交换被视为完成并发送到 模拟 端点。

from("direct:start")
    .aggregate(header("id"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

XML DSL 示例

以下示例演示了如何在 XML 中配置相同的路由:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy"
                   completionTimeout="3000">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>

指定关联表达式

在 Java DSL 中,关联表达式始终作为第一个参数传递给 aggregate () DSL 命令。您不仅限于在此处使用简单表达式语言。您可以使用任何表达式语言或脚本语言(如 XPath、XQuery、SQL 等)指定关联表达式。

对于考试,要使用 XPath 表达式关联交换,您可以使用以下 Java DSL 路由:

from("direct:start")
    .aggregate(xpath("/stockQuote/@symbol"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

如果无法在特定传入的交换上评估关联表达式,则聚合器会默认引发 CamelExchangeException。您可以通过设置 ignoreInvalidCorrelationKeys 选项来阻止此异常。例如,在 Java DSL 中:

from(...).aggregate(...).ignoreInvalidCorrelationKeys()

在 XML DSL 中,您可以将 ignoreInvalidCorrelationKeys 选项设置为属性,如下所示:

<aggregate strategyRef="aggregatorStrategy"
           ignoreInvalidCorrelationKeys="true"
           ...>
    ...
</aggregate>

指定聚合策略

在 Java DSL 中,您可以将聚合策略作为第二个参数传递给 aggregate () DSL 命令,或使用 aggregate Strategy () 子句指定它。例如,您可以使用 aggregationStrategy () 子句,如下所示:

from("direct:start")
    .aggregate(header("id"))
        .aggregationStrategy(new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

Apache Camel 提供以下基本聚合策略(类属于 org.apache.camel.processor.aggregate Java 软件包):

UseLatestAggregationStrategy
返回给定关联密钥的最后一个交换,从而丢弃所有之前与此密钥的交换。例如,此策略对于从股票交易中节流源很有用,您只想知道特定库存符号的最新价格。
UseOriginalAggregationStrategy
返回给定关联密钥的第一个交换,丢弃所有之后与此密钥的交换。您必须通过调用 UseOriginalAggregationStrategy.setOriginal () 来设置第一个交换,然后才能使用此策略。
GroupedExchangeAggregationStrategy
将给定关联密钥 的所有 交换连接到列表中,该列表存储在 Exchange.GROUPED_EXCHANGE Exchange 属性中。请参阅 “分组交换”一节

实现自定义聚合策略

如果要应用不同的聚合策略,您可以实现以下聚合策略基本接口之一:

org.apache.camel.processor.aggregate.AggregationStrategy
基本聚合策略接口。
org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy

如果您希望实施在聚合周期超时时收到通知,请实施此接口。超时 通知方法有以下签名:

void timeout(Exchange oldExchange, int index, int total, long timeout)
org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy

如果您希望实施在聚合周期正常完成时收到通知,请实施此接口。通知方法有以下签名:

void onCompletion(Exchange exchange)

例如,以下代码显示了两个不同的自定义聚合策略,即 StringAggregationStrategyArrayListAggregationStrategy::

 //simply combines Exchange String body values using '' as a delimiter
 class StringAggregationStrategy implements AggregationStrategy {

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         if (oldExchange == null) {
             return newExchange;
         }

         String oldBody = oldExchange.getIn().getBody(String.class);
         String newBody = newExchange.getIn().getBody(String.class);
         oldExchange.getIn().setBody(oldBody + "" + newBody);
         return oldExchange;
     }
 }

 //simply combines Exchange body values into an ArrayList<Object>
 class ArrayListAggregationStrategy implements AggregationStrategy {

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
 	    Object newBody = newExchange.getIn().getBody();
     	ArrayList<Object> list = null;
         if (oldExchange == null) {
 		    list = new ArrayList<Object>();
 		    list.add(newBody);
 		    newExchange.getIn().setBody(list);
 		    return newExchange;
         } else {
 	        list = oldExchange.getIn().getBody(ArrayList.class);
 	    	list.add(newBody);
 		    return oldExchange;
 	    }
     }
 }
注意

从 Apache Camel 2.0 开始,为非常第一个交换调用 AggregationStrategy.aggregate () 回调方法。在聚合方法第一次调用中,oldExchange 参数为 nullnewExchange 参数包含第一个传入交换。

要使用自定义策略类 ArrayListAggregationStrategy 来聚合信息,请定义类似如下的路由:

from("direct:start")
    .aggregate(header("StockSymbol"), new ArrayListAggregationStrategy())
    .completionTimeout(3000)
    .to("mock:result");

您还可以使用 XML 中的自定义聚合策略配置路由,如下所示:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregate strategyRef="aggregatorStrategy"
               completionTimeout="3000">
      <correlationExpression>
        <simple>header.StockSymbol</simple>
      </correlationExpression>
      <to uri="mock:aggregated"/>
    </aggregate>
  </route>
</camelContext>

<bean id="aggregatorStrategy" class="com.my_package_name.ArrayListAggregationStrategy"/>

控制自定义聚合策略的生命周期

您可以实施自定义聚合策略,以便其生命周期与控制它的企业集成模式的生命周期一致。这可用于确保聚合策略可以安全关闭。

要使用生命周期支持实施聚合策略,您必须实现 org.apache.camel.Service 接口(除 AggregationStrategy 接口之外),并提供 start ()stop () 生命周期方法的实现。例如,以下代码示例显示了具有生命周期支持的聚合策略概述:

// Java
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Service;
import java.lang.Exception;
...
class MyAggStrategyWithLifecycleControl
       implements AggregationStrategy, Service {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // Implementation not shown...
        ...
    }

    public void start() throws Exception {
        // Actions to perform when the enclosing EIP starts up
        ...
    }

    public void stop() throws Exception {
        // Actions to perform when the enclosing EIP is stopping
        ...
    }
}

Exchange 属性

每个聚合交换上设置以下属性:

标头类型描述 Aggregated Exchange Properties

Exchange.AGGREGATED_SIZE

int

聚合到此交换的交换总数。

Exchange.AGGREGATED_COMPLETED_BY

字符串

指明负责完成聚合交换的机制。可能的值有: predicatesizetimeoutintervalconsumer

以下属性由 SQL 组件聚合存储库红色设置(请参阅 “持久性聚合存储库”一节):

标头类型描述 Redelivered Exchange Properties

Exchange.REDELIVERY_COUNTER

int

当前重新发送尝试的序列号(从 1开始)。

指定完成条件

必须至少 指定一个完成条件,这决定了聚合交换何时离开聚合器,并继续路由上的下一个节点。可以指定以下完成条件:

completionPredicate
聚合每个交换后评估 predicate,以确定完整性。值 true 表示聚合交换已完成。另外,您还可以定义实现 Predicate 接口的自定义 AggregationStrategy 而不是设置这个选项,在这种情况下,AggregationStrategy 将用作 completion predicate。
completionSize
在聚合指定数量的传入交换后完成聚合交换。
completionTimeout

(与 completionInterval兼容) 如果指定的超时内没有聚合交换,则完成聚合交换。

换句话说,超时机制会跟踪 每个 关联键值的超时时间。时钟在收到特定键值的最新交换后开始循环。如果没有在指定的超时时间内收到具有相同键值的另一个交换,则对应的聚合交换被标记为 complete,并发送到路由上的下一个节点。

completionInterval

(与 completionTimeout兼容) 在每次时间间隔后(指定长度) 完成所有 未完成的聚合交换。

没有 为每个聚合交换量身定制时间间隔。这种机制强制完成所有未完成的聚合交换。因此,在某些情况下,此机制可以在启动聚合后立即完成聚合交换。

completionFromBatchConsumer
当与支持 批处理消费者 机制的消费者端点结合使用时,此完成选项会根据它从消费者端点接收的信息,在当前批处理完成后自动找出出的。请参阅 “批处理消费者”一节
forceCompletionOnStop
启用此选项后,它会在当前路由上下文停止时强制完成所有未完成的聚合交换。

前面的完成条件可以任意组合使用,但 completionTimeoutcompletionInterval 条件除外,它们不能同时启用。当条件组合使用时,常规规则是要触发的第一个完成条件是有效的完成条件。

指定 completion predicate

您可以指定一个任意 predicate 表达式,来确定聚合交换完成后。评估 predicate 表达式的方法有两种:

  • 在最新的聚合交换上 ,是默认的行为。
  • 最新的传入交换 iwl-MIRROR this behavior 上 ,当您启用 eagerCheckCompletion 选项时,会选择此行为。

例如,如果要在每次收到 ALERT 消息时终止库存引号流(由最新传入的交换中的 MsgType 标头值表示),您可以定义一个类似如下的路由:

from("direct:start")
    .aggregate(
      header("id"),
      new UseLatestAggregationStrategy()
    )
        .completionPredicate(
          header("MsgType").isEqualTo("ALERT")
         )
        .eagerCheckCompletion()
    .to("mock:result");

以下示例演示了如何使用 XML 配置相同的路由:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregate strategyRef="aggregatorStrategy"
               eagerCheckCompletion="true">
      <correlationExpression>
          <simple>header.StockSymbol</simple>
      </correlationExpression>
      <completionPredicate>
          <simple>$MsgType = 'ALERT'</simple>
      </completionPredicate>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>

指定动态完成超时

可以指定 动态完成超时,其中为每个传入的交换重新计算超时值。例如,若要从每个传入交换中的 timeout 标头设置超时值,您可以定义路由,如下所示:

from("direct:start")
    .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
        .completionTimeout(header("timeout"))
    .to("mock:aggregated");

您可以在 XML DSL 中配置相同的路由,如下所示:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <completionTimeout>
                <header>timeout</header>
            </completionTimeout>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
注意

如果动态值为 null0, 您也可以添加固定的超时值,Apache Camel 将回退到使用这个值。

指定动态完成大小

可以指定 动态完成大小,每个传入的交换都会重新计算完成大小。例如,若要从每个传入交换中的 mySize 标头设置完成大小,您可以定义路由,如下所示:

from("direct:start")
    .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
        .completionSize(header("mySize"))
    .to("mock:aggregated");

使用 Spring XML 的同一示例:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <completionSize>
                <header>mySize</header>
            </completionSize>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
注意

如果动态值为 null0, 您也可以添加固定的大小值,Apache Camel 将回退到使用这个值。

在 AggregationStrategy 中强制完成单个组

如果您实施自定义 AggregationStrategy 类,可以通过将 Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP 交换属性设置为 true 来强制完成当前消息组。这个机制 只会影响 当前组:其他消息组(具有不同关联 ID) 不会被 强制完成。此机制会覆盖任何其他完成机制,如 predicate、大小、超时等。

例如,如果消息正文大小大于 5,则以下示例 AggregationStrategy 类完成当前的组:

// Java
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String body = oldExchange.getIn().getBody(String.class) + "+"
            + newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(body);
        if (body.length() >= 5) {
            oldExchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }
        return oldExchange;
    }
}

强制完成带有特殊消息的所有组

通过将带有特殊标头的消息发送到路由,可以强制完成所有未完成的聚合消息。您可以使用两个替代的标头设置来强制完成:

Exchange.AGGREGATION_COMPLETE_ALL_GROUPS
设置为 true,以强制完成当前聚合周期。此消息仅充当信号,不包含在 任何聚合周期中。处理此信号消息后,消息的内容将被丢弃。
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE
设置为 true,以强制完成当前聚合周期。此消息 包含在 当前的聚合周期中。

使用 AggregateController

org.apache.camel.processor.aggregate.AggregateController 可让您在运行时使用 Java 或 JMX API 控制聚合。这可用于强制完成一组交换,或查询当前的运行时统计信息。

如果没有配置自定义,聚合器提供了一个默认实现,您可以使用 getAggregateController () 方法访问它。但是,使用 aggregateController 在路由中配置控制器。

private AggregateController controller = new DefaultAggregateController();

from("direct:start")
   .aggregate(header("id"), new MyAggregationStrategy()).completionSize(10).id("myAggregator")
      .aggregateController(controller)
      .to("mock:aggregated");

另外,您可以使用 AggregateController上的 API 来强制完成。例如,使用键 foo 完成组

int groups = controller.forceCompletionOfGroup("foo");

返回的数量将是完成的组数量。以下是完成所有组的 API:

 int groups = controller.forceCompletionOfAllGroups();

强制唯一关联键

在一些聚合场景中,您可能想要强制执行相关的键对于每个批处理都是唯一的的条件。换句话说,当特定关联密钥的聚合交换完成时,您要确保不允许进一步聚合与该关联键的交换。例如,如果路由的后者部分需要处理具有唯一关联键值的交换,您可能希望强制执行此条件。

根据配置完成条件的方式,使用特定关联密钥生成多个聚合交换的风险。例如,虽然您可以定义一个 completion predicate,它旨在 等待所有 与特定关联键的交换都被接收,但您也可以定义完成超时,该超时可以在所有使用该键的交换之前触发。在这种情况下,较晚的交换可能会提高与具有相同关联键值 的第二个 聚合交换。

在这种情况下,您可以通过设置 closeCorrelationKeyOnCompletion 选项,将聚合器配置为阻止之前相关的键值的聚合交换。为了抑制重复关联键值,聚合器需要在缓存中记录以前的键值。此缓存的大小(缓存的关联密钥数)被指定为 closeCorrelationKeyOnCompletion () DSL 命令的参数。要指定无限大小的缓存,您可以传递值零或负整数。例如,指定 10000 键值的缓存大小:

from("direct:start")
    .aggregate(header("UniqueBatchID"), new MyConcatenateStrategy())
        .completionSize(header("mySize"))
        .closeCorrelationKeyOnCompletion(10000)
    .to("mock:aggregated");

如果聚合交换以重复的关联键值完成,则聚合器会引发 ClosedCorrelationKeyException 异常。

使用简单表达式进行基于流的处理

您可以在流传输模式中使用简单语言表达式作为令牌以及 tokenizeXML 子命令。使用简单语言表达式将启用对动态令牌的支持。

例如,要使用 Java 分割由标签 人员 减少的名称序列,您可以使用令牌化 XML bean 和简单语言令牌将文件分成 name 元素。

public void testTokenizeXMLPairSimple() throws Exception {
        Expression exp = TokenizeLanguage.tokenizeXML("${header.foo}", null);

获取由 < person > 分离的名称的输入字符串,并将 &lt ;person& gt; 设置为令牌。

        exchange.getIn().setHeader("foo", "<person>");
        exchange.getIn().setBody("<persons><person>James</person><person>Claus</person><person>Jonathan</person><person>Hadrian</person></persons>");

列出从输入中分离的名称。

        List<?> names = exp.evaluate(exchange, List.class);
        assertEquals(4, names.size());

        assertEquals("<person>James</person>", names.get(0));
        assertEquals("<person>Claus</person>", names.get(1));
        assertEquals("<person>Jonathan</person>", names.get(2));
        assertEquals("<person>Hadrian</person>", names.get(3));
    }

分组交换

您可以将传出批处理中的所有聚合交换组合成单个 org.apache.camel.impl.GroupedExchange holder 类。要启用分组交换,请指定 groupExchanges () 选项,如以下 Java DSL 路由所示:

from("direct:start")
    .aggregate(header("StockSymbol"))
        .completionTimeout(3000)
        .groupExchanges()
    .to("mock:result");

发送到 mock:result 的分组交换列表包含消息正文中聚合交换的列表。以下行显示后续处理器如何以列表的形式访问分组交换的内容:

// Java
List<Exchange> grouped = ex.getIn().getBody(List.class);
注意

当您启用分组交换功能时,不得 配置聚合策略(分组的交换功能本身是一个聚合策略)。

注意

从传出交换上的属性访问分组交换的旧方法现已弃用,并将在以后的发行版本中删除。

批处理消费者

聚合器可以与 批处理消费者 模式协同工作,以聚合批处理消费者报告的消息总数(批处理消费者端点设置 CamelBatchSizeCamelBatchIndexCamelBatchComplete 属性在传入交换上)。例如,要聚合文件消费者端点找到的所有文件,您可以使用如下路由:

from("file://inbox")
    .aggregate(xpath("//order/@customerId"), new AggregateCustomerOrderStrategy())
    .completionFromBatchConsumer()
    .to("bean:processOrder");

目前,以下端点支持批处理消费者机制:file、FTP、Mail、iBatis 和 JPA。

持久性聚合存储库

默认聚合器仅使用内存 聚合存储库。如果要永久存储待处理的聚合交换,您可以使用 SQL 组件作为 持久聚合存储库。SQL 组件包含一个 JdbcAggregationRepository,它会持续聚合消息,并确保您不会丢失任何消息。

成功处理交换后,在存储库上调用 confirm 方法时,它标记为 complete。这意味着,如果同一交换再次失败,它将被重试,直到成功为止。

添加对 camel-sql 的依赖

要使用 SQL 组件,您必须在项目中包含对 camel-sql 的依赖项。例如,如果您使用 Maven pom.xml 文件:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-sql</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

创建聚合数据库表

您必须创建单独的聚合和已完成的数据库表,以实现持久性。例如,以下查询会为名为 my_aggregation_repo 的数据库创建表:

CREATE TABLE my_aggregation_repo (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 constraint aggregation_pk PRIMARY KEY (id)
);

CREATE TABLE my_aggregation_repo_completed (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 constraint aggregation_completed_pk PRIMARY KEY (id)
);
}

配置聚合存储库

您还必须在框架 XML 文件中配置聚合存储库(如 Spring 或 Blueprint):

<bean id="my_repo"
    class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
    <property name="repositoryName" value="my_aggregation_repo"/>
    <property name="transactionManager" ref="my_tx_manager"/>
    <property name="dataSource" ref="my_data_source"/>
    ...
</bean>

需要 repositoryNametransactionManagerdataSource 属性。有关持久聚合存储库的更多详细信息,请参阅 Apache Camel 组件参考指南 中的 SQL 组件

线程选项

图 8.6 “聚合器实施” 所示,聚合器与路由的后者部分分离,其中发送到路由的交换由专用线程池处理。默认情况下,这个池仅包含一个线程。如果要指定多个线程的池,请启用 parallelProcessing 选项,如下所示:

from("direct:start")
    .aggregate(header("id"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
        .parallelProcessing()
    .to("mock:aggregated");

默认情况下,这会创建一个具有 10 个 worker 线程的池。

如果要对创建的线程池进行更多控制,请使用 executorService 选项指定自定义 java.util.concurrent.ExecutorService 实例(在这种情况下,不需要启用 parallelProcessing 选项)。

聚合到列表

常见的聚合场景涉及将一系列传入的消息聚合到 List 对象。为方便这种情况,Apache Camel 提供了 AbstractListAggregationStrategy 抽象类,您可以快速扩展为本例创建聚合策略。传入消息正文( T )被聚合到完成的交换中,消息正文为 List<T>

例如,要将一系列 Integer 消息正文聚合到一个 List<Integer& gt; 对象中,您可以使用定义的聚合策略:

import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
...
/**
 * Strategy to aggregate integers into a List<Integer>.
 */
public final class MyListOfNumbersStrategy extends AbstractListAggregationStrategy<Integer> {
 
    @Override
    public Integer getValue(Exchange exchange) {
        // the message body contains a number, so just return that as-is
        return exchange.getIn().getBody(Integer.class);
    }
}

聚合器选项

聚合器支持以下选项:

表 8.1. 聚合器选项

选项默认描述

correlationExpression

 

评估用于聚合的关联键的强制表达式。具有相同关联键的 Exchange 聚合在一起。如果无法评估关联密钥,则会抛出 Exception。您可以使用 ignoreBadCorrelationKeys 选项禁用此功能。

aggregationStrategy

 

强制 AggregationStrategy,用于将传入交换与现有合并交换合并。在第一次调用 oldExchange 参数时,是 null。在后续调用中,oldExchange 包含合并的交换,newExchange 是新的传入交换。从 Camel 2.9.2 开始,策略可以是 TimeoutAwareAggregationStrategy 实现,它支持超时回调。从 Camel 2.16 开始,策略也可以是 PreCompletionAwareAggregationStrategy 实现。它以 pre-completion 模式运行完成检查。

strategyRef

 

在 Registry 中查找 AggregationStrategy 的引用。

completionSize

 

聚合完成前聚合的消息数量。这个选项可以设置为固定值,或使用一个表达式来动态评估大小 - 将因此使用 Integer。如果两者都设置了 Camel,如果 Expression 结果为 null0, 则将回退到使用固定值。

completionTimeout

 

中间时间,聚合交换应在完成前不活跃。这个选项可以设置为固定值,或使用允许您动态评估超时的 Expression 进行设置 - 将使用 Long。如果两者都设置了 Camel,如果 Expression 结果为 null0, 则将回退到使用固定值。您不能将这个选项与 completionInterval 一起使用,只能使用其中一个。

completionInterval

 

在 millis 中重复周期,聚合器将完成所有当前的聚合交换。Camel 有一个后台任务,每个期间都会触发。您不能将这个选项与 completionTimeout 一同使用,只能使用其中一个选项。

completionPredicate

 

指定 predicate ( org.apache.camel.Predicate 类型),该类型在聚合的交换完成后发出信号。另外,您还可以定义实现 Predicate 接口的自定义 AggregationStrategy 而不是设置这个选项,在这种情况下,AggregationStrategy 将用作 completion predicate。

completionFromBatchConsumer

false

这个选项是交换来自 Batch Consumer。然后,当启用 第 8.5 节 “聚合器” 时,使用由消息标头 CamelBatchSize 中的 Batch Consumer 决定。请参阅 Batch Consumer。这可用于聚合来自给定轮询的 see File 端点使用的所有文件。

eagerCheckCompletion

false

在收到新的传入的交换时,是否会被强制检查是否有完成。这个选项会影响 completionPredicate 选项的行为,因为交换会相应地传递更改。如果为 false,在 Predicate 中传递的 Exchange 是 聚合的 Exchange,这意味着您可以为 Predicate 提供关于 AggregationStrategy 的聚合交换的任何信息。当 Predicate 传递的 Exchange 为 传入的 Exchange 时,这意味着您可以从传入交换访问数据。

forceCompletionOnStop

false

如果为 true,在当前路由上下文停止时完成所有聚合交换。

groupExchanges

false

如果启用,Camel 会将所有聚合的交换分组到一个组合的 org.apache.camel.impl.GroupedExchange holder 类,该类包含所有聚合交换。因此,只有一个交换才会从聚合器中发送。可用于将许多传入的 Exchange 组合为一个输出交换,而无需自行编码自定义 AggregationStrategy

ignoreInvalidCorrelationKeys

false

是否要忽略无法评估为值的关联键。默认情况下,Camel 将抛出例外,但您可以启用这个选项并忽略这种情况。

closeCorrelationKeyOnCompletion

 

是否应该 接受 相关的交换。您可以启用它来指示是否关联密钥已经完成,则拒绝具有相同关联密钥的任何新交换。然后 Camel 将抛出一个 closedCorrelationKeyException 异常。使用此选项时,您将传递 一个整数,它是 LRUCache 的数字,这会保留最后的 X 号关闭键。您可以传递 0 或负值来指示未绑定的缓存。通过传递数字,您可以保证,如果您使用不同关联键的日志,缓存将太大。

discardOnCompletionTimeout

false

Camel 2.5: 是否应该丢弃因为超时而完成的交换。如果启用,则当超时发生超时时,聚合的消息 不会 发出,而是被丢弃(丢弃)。

aggregationRepository

 

允许您自己自己实施 org.apache.camel.spi.AggregationRepository,跟踪当前的 inflight 聚合交换。Camel 默认使用基于内存的实现。

aggregationRepositoryRef

 

在 Registry 中查找 聚合存储库 的引用。

parallelProcessing

false

当聚合完成后,它们会从聚合器中发送。这个选项指示 Camel 是否应该将具有多个线程的线程池用于并发。如果没有指定自定义线程池,则 Camel 会创建一个具有 10 个并发线程的默认池。

executorService

 

如果使用 并行处理,您可以指定要使用的自定义线程池。实际上,如果您不使用 并行处理 这个自定义线程池,则也用于发送聚合的交换。

executorServiceRef

 

在 Registry 中查找 executorService 的引用

timeoutCheckerExecutorService

 

如果使用其中一个 completionTimeout,completionTimeoutExpression, 或 completionInterval 选项,则会创建一个后台线程来检查每个聚合器的完成状态。设置这个选项,以提供要使用的自定义线程池,而不是为每个聚合器创建新线程。

timeoutCheckerExecutorServiceRef

 

在 registry 中查找 timeoutCheckerExecutorService 的引用。

completeAllOnStop

 

当您停止聚合器时,这个选项允许它从聚合存储库完成所有待处理的交换。

optimisticLocking

false

打开 optimistic locking,它可与聚合存储库结合使用。

optimisticLockRetryPolicy

 

为 optimistic locking 配置重试策略。