8.3. 接收者列表

概述

图 8.3 “接收者列表模式” 所示 的接收者列表 是发送每个传入消息到多个不同目的地的路由器类型。另外,接收者列表通常要求在运行时计算接收者列表。

图 8.3. 接收者列表模式

接收者列表模式

带有固定目的地的接收者列表

最简单的接收者列表是预先修复和已知的目的地列表,交换模式为 InOnly。在这种情况下,您可以将目的地列表绑定到 to () Java DSL 命令。

注意

此处给出的示例,对于具有固定目的地的接收者列表,仅适用于 InOnly Exchange 模式(与管道 和过滤器模式类似)。如果要为带有 Out 消息的交换模式创建接收者列表,请使用 多播 模式。

Java DSL 示例

以下示例演示了如何将 InOnly Exchange 从消费者端点 queue:a 路由到一个固定的目的地列表:

from("seda:a").to("seda:b", "seda:c", "seda:d");

XML 配置示例

以下示例演示了如何在 XML 中配置相同的路由:

<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <to uri="seda:b"/>
    <to uri="seda:c"/>
    <to uri="seda:d"/>
  </route>
</camelContext>

运行时计算的接收者列表

在大多数情况下,当使用接收者列表模式时,应在运行时计算接收者列表。为此,请使用 recipientList () 处理器,该处理器将目的地列表视为其唯一的参数。由于 Apache Camel 将类型转换器应用到 list 参数,因此应该可以使用大多数标准 Java 列表类型(如集合、列表或数组)。有关类型转换器的详情,请参考 第 34.3 节 “built-In Type Converters”

接收者接收 同一 交换实例的副本,并且 Apache Camel 按顺序执行它们。

Java DSL 示例

以下示例演示了如何从名为 recipientListHeader 的消息标头中提取目的地列表,其中标头值是以逗号分隔的端点 URI 列表:

from("direct:a").recipientList(header("recipientListHeader").tokenize(","));

在某些情况下,如果标头值是列表类型,您可以直接使用它作为 recipientList () 的参数。例如:

from("seda:a").recipientList(header("recipientListHeader"));

但是,这个示例完全依赖于底层组件如何解析这个特定标头。如果组件将标头解析为简单字符串,则本例 将无法正常工作。标头必须解析为某些类型的 Java 列表。

XML 配置示例

以下示例演示了如何在 XML 中配置前面的路由,其中标头值是以逗号分隔的端点 URI 列表:

<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <recipientList delimiter=",">
      <header>recipientListHeader</header>
    </recipientList>
  </route>
</camelContext>

并行发送到多个接收者

从 Camel 2.2 开始提供

接收者列表模式 支持 并行处理,它与 拆分模式 中对应的功能类似。使用并行处理功能将交换发送到多个接收方,例如:

from("direct:a").recipientList(header("myHeader")).parallelProcessing();

在 Spring XML 中,并行处理功能是作为 recipientList tagProductShortName-wagonfor 的属性实现的:

<route>
  <from uri="direct:a"/>
  <recipientList parallelProcessing="true">
    <header>myHeader</header>
  </recipientList>
</route>

异常停止

从 Camel 2.2 开始提供

接收者列表 支持 stopOnException 功能,您可以在任何接收者失败时停止发送到任何其他接收者。

from("direct:a").recipientList(header("myHeader")).stopOnException();

在 Spring XML 中,它在接收者列表标签上的属性。

在 Spring XML 中,针对 exception 功能的 stop 作为 recipientList tag iwl-PROFILEfor 的属性实现:

<route>
  <from uri="direct:a"/>
  <recipientList stopOnException="true">
    <header>myHeader</header>
  </recipientList>
</route>
注意

您可以组合同一路由中的 parallelProcessingstopOnException

忽略无效的端点

从 Camel 2.3 开始提供

接收者列表模式 支持 ignoreInvalidEndpoints 选项,该选项可让接收者列表跳过无效的端点(路由 slips 模式 也支持这个选项)。例如:

from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

在 Spring XML 中,您可以通过在 recipientList 标签上设置 ignoreInvalidEndpoints 属性来启用这个选项,如下所示

<route>
  <from uri="direct:a"/>
  <recipientList ignoreInvalidEndpoints="true">
    <header>myHeader</header>
  </recipientList>
</route>

例如,myHeader 包含两个端点 direct:foo,xxx:bar。第一个端点有效且可以正常工作。第二个无效,因此忽略。当遇到无效的端点时,Apache Camel 日志为 INFO 级别。

使用自定义 AggregationStrategy

从 Camel 2.2 开始提供

您可以使用带有 接收者列表模式 的自定义 AggregationStrategy,这对聚合列表中的接收者回复很有用。默认情况下,Apache Camel 使用 UseLatestAggregationStrategy 聚合策略,该策略只保留最后收到的回复。对于更复杂的聚合策略,您可以自行定义 AggregationStrategy 接口 iwl-categoriessee 第 8.5 节 “聚合器” 的实现。例如,要将自定义聚合策略 MyOwnAggregationStrategy 应用到回复信息,您可以定义 Java DSL 路由,如下所示:

from("direct:a")
    .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
    .to("direct:b");

在 Spring XML 中,您可以将自定义聚合策略指定为 recipientList 标签上的属性,如下所示:

<route>
  <from uri="direct:a"/>
  <recipientList strategyRef="myStrategy">
    <header>myHeader</header>
  </recipientList>
  <to uri="direct:b"/>
</route>

<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

使用自定义线程池

从 Camel 2.2 开始提供

这只在使用 parallelProcessing 时才需要。默认情况下,Camel 使用有 10 个线程的线程池。请注意,当我们覆盖了线程池管理和配置之后(在 Camel 2.2 中)时,这可能会有变化。

您像使用自定义聚合策略一样配置此操作。

使用方法调用作为接收者列表

您可以使用 bean 集成来提供接收者,例如:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");

其中 MessageRouter bean 定义如下:

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}

Bean 作为接收者列表

您可以通过将 @RecipientList 注释添加到返回接收者列表的方法,使 bean 的行为为接收者列表。例如:

public class MessageRouter {

    @RecipientList
    public String routeTo() {
        String queueList = "activemq:queue:test1,activemq:queue:test2";
        return queueList;
    }
}

在这种情况下,不要在 路由中包含 recipientList DSL 命令。按如下方式定义路由:

from("activemq:queue:test").bean(MessageRouter.class, "routeTo");

使用超时

从 Camel 2.5 开始提供

如果使用 并行处理, 可以以毫秒为单位配置总 超时值。然后 Camel 将并行处理消息,直到超时为止。这允许您在一个消息较慢时继续处理。

在以下示例中,recipientlist 标头具有值 direct:a,direct:b,direct:c,以便消息发送到三个接收者。我们有一个 250 毫秒的超时时间,这意味着只有最后两个消息才能在时间范围内完成。因此,聚合会产生字符串结果 BC

from("direct:start")
    .recipientList(header("recipients"), ",")
    .aggregationStrategy(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;
                }

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
            }
        })
        .parallelProcessing().timeout(250)
    // use end to indicate end of recipientList clause
    .end()
    .to("mock:result");

from("direct:a").delay(500).to("mock:A").setBody(constant("A"));

from("direct:b").to("mock:B").setBody(constant("B"));

from("direct:c").to("mock:C").setBody(constant("C"));
注意

splittermulticastrecipientList 也支持这个 超时 功能。

默认情况下,如果超时没有调用 AggregationStrategy。但是,您可以实施专用的版本

// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

    /**
     * A timeout occurred
     *
     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
     */
    void timeout(Exchange oldExchange, int index, int total, long timeout);

如果确实需要,这允许您处理 AggregationStrategy 中的超时时间。

timeout 是总计

超时是 total,这意味着在 X 时间之后,Camel 将聚合在时间段内完成的消息。剩余部分将被取消。对于导致超时的第一个索引,Camel 还会在 TimeoutAwareAggregationStrategy 中调用 timeout 方法。

将自定义处理应用到传出消息

recipientList 发送消息到其中一个接收者端点之前,它会创建一个消息副本,这是原始消息的副本。在 shallow copy 中,原始消息的标头和有效负载仅通过参考来复制。每个新副本不包含这些元素本身的实例。因此,消息片段链接,您无法在将自定义处理路由到不同的端点时应用自定义处理。

如果要在将副本发送到其端点前对每个消息副本执行一些自定义处理,您可以在 recipientList 子句中调用 onPrepare DSL 命令。onPrepare 命令会在消息被授权 插入一个自定义处理器,只需在消息 被分配给 其端点前插入一个自定义处理器。例如,在以下路由中,针对每个接收者端点 的消息副本调用 CustomProc 处理器:

from("direct:start")
  .recipientList().onPrepare(new CustomProc());

onPrepare DSL 命令的常见用例是执行消息的一些或所有元素的深度副本。这允许每个消息副本独立于其他副本进行修改。例如,以下 CustomProc 处理器类执行消息正文的深度副本,其中消息正文假定为 type, BodyType,而 deep copy 由方法 BodyType.deepCopy () 执行。

// Java
import org.apache.camel.*;
...
public class CustomProc implements Processor {

    public void process(Exchange exchange) throws Exception {
        BodyType body = exchange.getIn().getBody(BodyType.class);

        // Make a _deep_ copy of of the body object
        BodyType clone =  BodyType.deepCopy();
        exchange.getIn().setBody(clone);

        // Headers and attachments have already been
        // shallow-copied. If you need deep copies,
        // add some more code here.
    }
}

选项

recipientList DSL 命令支持以下选项:

Name

默认值

描述

delimiter

,

如果 Expression 返回多个端点,则使用分隔符。

strategyRef

 

指的是 AggregationStrategy 用于将来自接收者的回复汇编为来自 第 8.3 节 “接收者列表” 的单个传出消息。默认情况下,Camel 将使用最后一个回复作为传出消息。

strategyMethodName

 

这个选项可用于在将 POJO 用作 AggregationStrategy 时明确指定要使用的方法名称。

strategyMethodAllowNull

false

当使用 POJO 作为 AggregationStrategy 时,可以使用这个选项。如果为 false 则不会使用聚合方法,如果没有数据来增强。如果为 true,则当没有数据可增强时,使用null 值用于 oldExchange

parallelProcessing

false

Camel 2.2: 如果启用,则会同时向接收方发送信息。请注意,调用者线程仍然会等到所有消息都已完全处理,然后再继续。它只会从同时发生的接收者发送和接收回复。

parallelAggregate

false

如果启用,则 AggregationStrategy 上的聚合方法可以同时调用。请注意,这需要实施 AggregationStrategy 作为 thread-safe。默认情况下,此选项为 false,这意味着 Camel 会自动同步对聚合方法的调用。但是,在某些用例中,您可以通过将 AggregationStrategy 实现为 thread-safe 来提高性能,并将此选项设置为 true

executorServiceRef

 

Camel 2.2: 请参阅用于并行处理的自定义线程池。请注意,如果您设定了这个选项,则并行处理会被自动指示,您不必启用该选项。

stopOnException

false

Camel 2.2: 发生异常时是否立即停止处理。如果禁用,则 Camel 会将该消息发送到所有收件人,无论其中之一是否失败。您可以在 AggregationStrategy 类中处理异常,您可以完全控制如何处理这种情况。

ignoreInvalidEndpoints

false

Camel 2.3: 如果无法解析端点 uri,则应忽略它。否则 Camel 将抛出一个异常,表示 endpoint uri 无效。

false

Camel 2.5: 如果启用,则 Camel 将处理超出顺序的回复,例如返回的顺序。如果禁用,Camel 将以与指定的表达式相同的顺序处理回复。

timeout

 

Camel 2.5: 设置在 millis 中指定的总超时。如果 第 8.3 节 “接收者列表” 无法发送并处理给定时间段内的所有回复,则超时触发器和 第 8.3 节 “接收者列表” 中断并继续。请注意,如果您提供 TimeoutAwareAggregationStrategy,则在中断前会调用 timeout 方法。

onPrepareRef

 

Camel 2.8: 请参阅自定义处理器,以准备每个接收者的交换副本。这可让您执行任何自定义逻辑,如 deep-cloning the message payload (如果需要的话)。

shareUnitOfWork

false

Camel 2.8: 是否应共享工作单元。如需了解更多详细信息,请参阅 第 8.4 节 “Splitter” 中的同一选项。

cacheSize

0

Camel 2.13.1/2.12.4: 允许为 ProducerCache 配置缓存大小,这会缓存制作者以便在路由 slip 中重复使用。默认情况下,将使用默认缓存大小为 0。将值设为 -1 可允许关闭缓存。

在 Recipient 列表中使用 Exchange Pattern

默认情况下,Recipient List 使用当前的交换模式。但是,在有些情况下,您可以使用不同的交换模式向接收方发送消息。

例如,您可能有一个启动为 InOnly 路由的路由。现在,如果要将 InOut Exchange 模式与接收者列表搭配使用,则需要直接在接收者端点中配置交换模式。

以下示例演示了新文件开始为 InOnly 的路由,然后路由到接收者列表。如果要将 InOut 与 ActiveMQ (JMS)端点搭配使用,您需要使用与 InOut 选项等同的 exchangePattern 指定此端点。但是,响应形成 JMS 请求或回复将持续路由,因此响应作为 outbox 目录中的文件存储在 中。

from("file:inbox")
  // the exchange pattern is InOnly initially when using a file route
  .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut")
  .to("file:outbox");
注意

InOut 交换模式必须在超时期间获得响应。但是,如果没有检索响应,它会失败。