2.2. 多个输入

概述

标准路由仅使用 Java DSL 中的 from(EndpointURL) 语法从单一端点中获取其输入。但是,如果您需要为路由定义多个输入,该怎么办?Apache Camel 提供了一些替代方法来指定多个到路由的输入。采取的方法取决于您是否希望相互独立处理交换,还是您希望以某种方式组合使用不同输入的交换(在这种情况下,您应该使用 “内容增强器模式”一节)。

多个独立输入

指定多个输入的最简单方法是使用 from() DSL 命令的多参数形式,例如:

from("URI1", "URI2", "URI3").to("DestinationUri");

或者您可以使用以下等效语法:

from("URI1").from("URI2").from("URI3").to("DestinationUri");

在这两个示例中,从每个输入端点、URI1、URI 2URI3 中交换交换,分别相互处理,并在单独的线程中处理。实际上,您可以认为前面的路由等同于以下三个独立的路由:

from("URI1").to("DestinationUri");
from("URI2").to("DestinationUri");
from("URI3").to("DestinationUri");

分段路由

例如,您可能希望合并来自两个不同的消息传递系统的传入信息,并使用同一路由处理它们。在大多数情况下,您可以通过将路由分成片段来处理多个输入,如 图 2.5 “使用分段路由处理多个输入” 所示。

图 2.5. 使用分段路由处理多个输入

使用分段路由处理多个输入

路由的初始片段从某些外部队列 3.10.0--XDP 中获取其输入,例如 activemq:Nyseactivemq:Nasdaq WWPN,并将传入的交换发送到内部端点 InternalUrl。第二个路由片段合并传入的交换,从内部端点中提取,并将它们发送到目标队列 activemq:USTxnInternalUrl 是端点的 URL,仅用于在路由器应用程序中使用。以下类型的端点适合内部使用:

这些端点的主要目的是您可以将路由的不同片段粘滞在一起。它们都提供了将多个输入合并到一个路由的有效方法。

直接端点

直接组件提供了将路由链接最简单的机制。直接组件的事件模型是 同步的,因此后续的路由片段在与第一网段相同的线程中运行。直接 URL 的一般格式是 direct:EndpointID,其中端点 ID 为 EndpointID,它只是标识端点实例的唯一字母数字字符串。

例如,如果要从两个消息队列中使用输入 located :Nyseactivemq:Nasdaq,并将它们合并到一个消息队列中 Foo:USTxn,您可以通过定义以下组路由来执行此操作:

from("activemq:Nyse").to("direct:mergeTxns");
from("activemq:Nasdaq").to("direct:mergeTxns");

from("direct:mergeTxns").to("activemq:USTxn");

其中前两个路由使用来自消息队列( NyseNasdaq )的输入,并将它们发送到端点,direct:mergeTxns。最后一个队列合并了前两个队列中的输入,并将组合消息流发送到 activemq:USTxn 队列。

直连端点的实施方式如下:每当交换到达制作者端点(例如,("direct:mergeTxns")时,直接将交换传递到具有相同端点 ID 的所有用户端点(例如,从direct:mergeTxns"))。直接端点只能用于在同一 Java 虚拟机(JVM)实例中属于同一 CamelContext 的路由之间进行通信。

SEDA 端点

SEDA 组件提供了将路由连接在一起的替代机制。您可以以类似直接组件的方式使用它,但它有不同的底层事件和线程模型,如下所示:

  • 对 SEDA 端点的处理 不会 同步。也就是说,当您将交换发送到 SEDA producer 端点时,控件会立即返回到路由中的上一处理器。
  • SEDA 端点包含一个队列缓冲区(包括 java.util.concurrent.BlockingQueue 类型),它将所有传入的交换存储在下一个路由网段处理之前的所有传入交换器。
  • 每个 SEDA 使用者端点都会创建一个线程池(默认大小为 5),以处理来自阻塞队列的交换对象。
  • SEDA 组件支持 竞争的使用者 模式,这样可保证每个传入的交换仅被处理一次,即使有多个用户附加到特定端点。

使用 SEDA 端点的一个主要优点是路由可以更快地响应,导致内置消费者线程池。库存交易示例可以重新编写为使用 SEDA 端点而不是直接端点,如下所示:

from("activemq:Nyse").to("seda:mergeTxns");
from("activemq:Nasdaq").to("seda:mergeTxns");

from("seda:mergeTxns").to("activemq:USTxn");

此示例和直接示例之间的主要区别在于,使用 SEDA 时,第二个路由片段(从 seda:mergeTxnsactivemq:USTxn)由五个线程池处理。

注意

SEDA 不仅仅是将多个路由段过去在一起。暂存事件驱动的架构(SEDA)包含了用于构建更易管理的多线程应用程序的设计理念。Apache Camel 中的 SEDA 组件的目的是使您能够将此设计理念应用到您的应用程序。有关 SEDA 的详情,请参考 http://www.eecs.harvard.edu/~mdw/proj/seda/

虚拟机端点

VM 组件与 SEDA 端点非常相似。唯一的不同之处在于,SEDA 组件只能将来自同一 CamelContext 中的路由片段链接在一起,VM 组件可让您将不同 Apache Camel 应用程序的路由链接在一起,只要它们在同一 Java 虚拟机中运行。

库存交易示例可以重新编写为使用 VM 端点而不是 SEDA 端点,如下所示:

from("activemq:Nyse").to("vm:mergeTxns");
from("activemq:Nasdaq").to("vm:mergeTxns");

在单独的路由器应用程序(在同一 Java 虚拟机中运行),您可以定义路由的第二个片段,如下所示:

from("vm:mergeTxns").to("activemq:USTxn");

内容增强器模式

内容增强器模式定义了处理对路由的多个输入的根本不同方式。当交换进入增强器处理器时,增强器联系一个外部资源以检索信息,然后添加到原始消息中。在此模式中,外部资源有效表示消息的第二个输入。

例如,假设您正在编写处理贡献请求的应用程序。在处理信贷请求之前,您需要用数据进行添加,从而向客户分配贡献度等级到客户,其中评分数据存储在 目录中的一个文件中,src/data/ratings。您可以使用 pollEnrich() 模式和 GroupedExchangeAggregationStrategy 聚合策略将传入的信请求与 ratings 文件中的数据合并,如下所示:

from("jms:queue:creditRequests")
    .pollEnrich("file:src/data/ratings?noop=true", new GroupedExchangeAggregationStrategy())
    .bean(new MergeCreditRequestAndRatings(), "merge")
    .to("jms:queue:reformattedRequests");

其中 GroupedExchangeAggregationStrategy 类是一个标准的聚合策略,来自 org.apache.camel.processor.aggregate 软件包,将每个新交换添加到 java.util.List 实例,并将结果列表存储在 Exchange.GROUPED_EXCHANGE 交换属性中。在本例中,列表中包含两个元素:原始交换(来自 creditRequests JMS 队列);以及增强器交换(来自文件端点)。

要访问分组的交换,您可以使用类似如下的代码:

public class MergeCreditRequestAndRatings {
    public void merge(Exchange ex) {
        // Obtain the grouped exchange
        List<Exchange> list = ex.getProperty(Exchange.GROUPED_EXCHANGE, List.class);

        // Get the exchanges from the grouped exchange
        Exchange originalEx = list.get(0);
        Exchange ratingsEx  = list.get(1);

        // Merge the exchanges
        ...
    }
}

此应用的另一种方法是将合并代码直接放入自定义聚合策略类的实施中。

有关内容丰富的模式的详情,请参考 第 10.1 节 “内容增强器”