2.2. 多个输入

概述

标准路由只从单一端点获取输入,使用 Java DSL 中的 (EndpointURL) 语法。但是,如果您需要为路由定义多个输入,情况会怎样?Apache Camel 提供多种替代方法,可用于为路由指定多个输入。使用的方法取决于是否希望相互独立处理交换,还是希望以某种方式合并来自不同输入的交换(在这种情况下,您应该使用 “内容丰富的模式”一节)。

多个独立输入

指定多个输入的最简单方式是使用 from () DSL 命令的多参数形式,例如:

from("URI1", "URI2", "URI3").to("DestinationUri");

或者您可以使用以下等同的语法:

from("URI1").from("URI2").from("URI3").to("DestinationUri");

在这两个示例中,从每个输入端点( URI1、URI 2URI3) 的交换都独立于彼此的处理,并在单独的线程中相互处理。实际上,您可以认为前面的路由等同于以下三个独立的路由:

from("URI1").to("DestinationUri");
from("URI2").to("DestinationUri");
from("URI3").to("DestinationUri");

分段路由

例如,您可能想要从两个不同的消息传递系统中合并传入的信息,并使用同一路由处理它们。在大多数情况下,您可以通过将路由分成片段来处理多个输入,如 图 2.5 “使用分段路由处理多个输入” 所示。

图 2.5. 使用分段路由处理多个输入

使用分段路由处理多个输入

路由的初始片段从一些外部队列的 inputs 获取,例如 activemq:Nyseactivemq:Nasdaq nmstate-TIMEOUT,并将传入的交换发送到内部端点 InternalUrl。第二个路由片段合并传入的交换,从内部端点提取它们并将其发送到目的地队列 activemq:USTxnInternalUrl 是仅用于在路由器应用程序中使用的 端点的 URL。以下类型的端点适用于内部使用:

这些端点的主要目的是使您能够将路由的不同片段放在一起。它们都提供了将多个输入合并到单个路由的有效方法。

直接端点

直接组件提供将路由链接在一起的最简单的机制。直接组件的事件模型是 同步的,以便后续路由的片段在与第一段相同的线程中运行。直接 URL 的一般格式为 direct:EndpointID,其中端点 ID EndpointID 只是用于标识端点实例的唯一数字字符串。

例如,如果要从两个消息队列获取输入: activemq:Nyseactivemq:Nasdaq,并将它们合并到单一消息队列中 activemq:USTxn,您可以通过定义以下组路由来执行此操作:

from("activemq:Nyse").to("direct:mergeTxns");
from("activemq:Nasdaq").to("direct:mergeTxns");

from("direct:mergeTxns").to("activemq:USTxn");

其中,前两个路由会获取消息队列、NyseNasdaq 的输入,并将其发送到端点 直接:mergeTxns。最后一个队列合并了之前两个队列的输入,并将组合的消息流发送到 activemq:USTxn 队列。

直接端点的实现如下:当交换到达生产端点时(例如,(( "direct:mergeTxns")),直接端点将交换直接传递给具有相同端点 ID 的所有消费者端点(例如,来自("direct:mergeTxns"))。直接端点只能用于在同一 Java 虚拟机(JVM)实例中属于同一 CamelContext 的路由之间进行通信。

SEDA 端点

SEDA 组件为将路由连接提供了替代机制。您可以以类似直接组件的方式使用它,但它有不同的底层事件和线程模型,如下所示:

  • 对 SEDA 端点的处理 不是 同步的。也就是说,当您将交换发送到 SEDA producer 端点时,控制会立即返回到该路由中的上述处理器。
  • SEDA 端点包含一个队列缓冲区( java.util.concurrent.BlockingQueue 类型),它会在下一个路由网段处理前存储所有传入的交换。
  • 每个 SEDA 消费者端点都会创建一个线程池(默认大小为 5),以处理块队列交换对象。
  • SEDA 组件支持 竞争消费者 模式,保证每个传入的交换只能处理一次,即使有多个用户附加到特定的端点。

使用 SEDA 端点的一个主要优点是,路由可以更迅速地响应,并划分到内置的使用者线程池。库存事务示例可以重新编写为使用 SEDA 端点而不是直接端点,如下所示:

from("activemq:Nyse").to("seda:mergeTxns");
from("activemq:Nasdaq").to("seda:mergeTxns");

from("seda:mergeTxns").to("activemq:USTxn");

这个示例和直接示例之间的主要区别在于,使用 SEDA 时,第二个路由片段(从 seda:mergeTxnsactivemq:USTxn)由五个线程池处理。

注意

SEDA 比只是过去路由片段要多。已暂存事件驱动的架构(SEDA)包括构建更可管理的多线程应用程序的设计理念。Apache Camel 中的 SEDA 组件的目的是使您能够将此设计理念应用到您的应用。有关 SEDA 的详情,请查看 http://www.eecs.harvard.edu/~mdw/proj/seda/

虚拟机端点

虚拟机组件与 SEDA 端点非常相似。唯一区别是,而 SEDA 组件限制为将同一 CamelContext 中的路由片段连接在一起,而虚拟机组件可让您将不同 Apache Camel 应用程序的路由连接在一起,只要它们在同一 Java 虚拟机中运行。

库存事务示例可以重新编写为使用 VM 端点而不是 SEDA 端点,如下所示:

from("activemq:Nyse").to("vm:mergeTxns");
from("activemq:Nasdaq").to("vm:mergeTxns");

在单独的路由器应用程序中(在同一 Java 虚拟机中运行),您可以按照如下所示定义路由的第二个片段:

from("vm:mergeTxns").to("activemq:USTxn");

内容丰富的模式

内容丰富的模式定义了处理多个路由的根本不同方法。当交换进入增强器处理器时,增强器联系外部资源来检索信息,然后添加到原始消息中。在这个模式中,外部资源有效地代表对消息的第二个输入。

例如,假设您正在编写处理信用请求的应用。在处理信用卡请求之前,您需要根据向客户分配分数评级的数据进行添加,其中 ratings 数据存储在 目录下的文件中,src/data/ratings 存储在 目录中。您可以使用 pollEnrich () 模式和 GroupedExchangeAggregationStrategy 聚合策略,将传入的分数请求与 ratings 文件中的数据组合,如下所示:

from("jms:queue:creditRequests")
    .pollEnrich("file:src/data/ratings?noop=true", new GroupedExchangeAggregationStrategy())
    .bean(new MergeCreditRequestAndRatings(), "merge")
    .to("jms:queue:reformattedRequests");

其中 GroupedExchangeAggregationStrategy 类是来自 org.apache.camel.processor.aggregate 软件包的标准聚合策略,将每个新交换添加到 java.util.List 实例中,并将生成的列表存储在 Exchange.GROUPED_EXCHANGE Exchange 属性中。这个列表包含两个元素:原始交换(来自 creditRequests JMS 队列);以及增强器交换(来自文件端点)。

要访问所分组的交换,您可以使用类似如下的代码:

public class MergeCreditRequestAndRatings {
    public void merge(Exchange ex) {
        // Obtain the grouped exchange
        List<Exchange> list = ex.getProperty(Exchange.GROUPED_EXCHANGE, List.class);

        // Get the exchanges from the grouped exchange
        Exchange originalEx = list.get(0);
        Exchange ratingsEx  = list.get(1);

        // Merge the exchanges
        ...
    }
}

此应用的另一种方法是将合并代码直接放入自定义聚合策略类的实施中。

有关内容丰富的模式的详情,请参考 第 10.1 节 “内容增强”