8.3. 接收者列表

概述

一个 接收者列表 (在 图 8.3 “接收者列表模式” 中显示的是路由器类型),将每个传入的信息发送到多个不同的目的地。此外,接收者列表通常要求在运行时计算接收者列表。

图 8.3. 接收者列表模式

接收者列表特征

带有固定目的地的接收者列表

最简单的接收者列表是预先固定并已知的目的地列表,而交换模式是 Only。在这种情况下,您可以硬编码到 to() Java DSL 命令中的目的地列表。

注意

此处提供的示例,对于具有固定目的地的接收者列表,仅适用于 InOnly Exchange 模式(类似于 管道和过滤器模式)。如果要创建用于使用 Out 消息交换模式的接收者列表,请使用 多播 模式。

Java DSL 示例

以下示例演示了如何将来自消费者端点 queue:aInOnly exchange 路由到固定目的地列表:

from("seda:a").to("seda:b", "seda:c", "seda:d");

XML 配置示例

以下示例演示了如何在 XML 中配置相同的路由:

<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <to uri="seda:b"/>
    <to uri="seda:c"/>
    <to uri="seda:d"/>
  </route>
</camelContext>

运行时计算的接收者列表

在大多数情况下,当您使用接收者列表模式时,应在运行时计算接收者列表。为此,可使用 recipientList() 处理器,它将目的地列表视为其唯一参数。由于 Apache Camel 将类型转换器应用到列表参数,因此应该使用大多数标准 Java 列表类型(如集合、列表或数组)。有关类型转换器的详情,请参考 第 34.3 节 “内置(In Type Converters)”

接收者收到 同一 交换实例的副本,而 Apache Camel 会按顺序执行它们。

Java DSL 示例

以下示例演示了如何从名为 recipientListHeader 的消息标头提取目的地列表,其中标头值是以逗号分隔的端点 URI 列表:

from("direct:a").recipientList(header("recipientListHeader").tokenize(","));

在某些情况下,如果标头值是一个列表类型,您可以将其直接用作 接收者List() 的参数。例如:

from("seda:a").recipientList(header("recipientListHeader"));

但是,这个示例完全依赖于底层组件如何解析这个特定标头。如果组件将标头解析为简单字符串,则该示例 将无法工作。标头必须解析到某种类型的 Java 列表。

XML 配置示例

以下示例演示了如何在 XML 中配置前面的路由,其中标头值是一个用逗号分开的端点 URI 列表:

<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <recipientList delimiter=",">
      <header>recipientListHeader</header>
    </recipientList>
  </route>
</camelContext>

并行发送到多个接收者

可作为 Camel 2.2 提供

接收者列表模式 支持 parallelProcessing,这与 splitter 模式中的相应功能类似。使用并行处理功能将交换发送到多个接收方,同时为 example.

from("direct:a").recipientList(header("myHeader")).parallelProcessing();

在 Spring XML 中,并行处理功能作为 接收者List tag>_<-abrtfor 示例上的属性实现:

<route>
  <from uri="direct:a"/>
  <recipientList parallelProcessing="true">
    <header>myHeader</header>
  </recipientList>
</route>

异常停止

可作为 Camel 2.2 提供

接收者列表 支持 stopOnException 功能,如果任何接收者失败,您可以使用它停止发送到任何进一步的接收者。

from("direct:a").recipientList(header("myHeader")).stopOnException();

以及在 Spring XML 中,其接收者列表标签上的属性。

在 Spring XML 中,异常功能的停止是作为 接收者List tag>_<-abrtfor 示例的属性实现:

<route>
  <from uri="direct:a"/>
  <recipientList stopOnException="true">
    <header>myHeader</header>
  </recipientList>
</route>
注意

您可以在同一 路由中组合并行处理 和停止OnException

忽略无效的端点

从 Camel 2.3 开始可用

接收者列表模式 支持 ignoreInvalidEndpoints 选项,它允许接收者列表跳过无效的端点(路由 slips 模式 也支持此选项)。例如:

from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

在 Spring XML 中,您可以通过在 recipientList 标签上设置 ignoreInvalidEndpoints 属性来启用这个选项,如下所示

<route>
  <from uri="direct:a"/>
  <recipientList ignoreInvalidEndpoints="true">
    <header>myHeader</header>
  </recipientList>
</route>

请考虑 myHeader 包含两个端点的情况,direct:foo,xxx:bar。第一个端点有效且可以正常工作。第二个是无效,因此忽略。当遇到无效的端点时,Apache Camel 都会在 INFO 级别记录。

使用自定义 AggregationStrategy

可作为 Camel 2.2 提供

您可以使用带有 接收者列表模式 的自定义 AggregationStrategy,这对于从列表中接收方发出回复很有用。默认情况下,Apache Camel 使用 UseLatestAggregationStrategy 聚合策略,以仅保留上次收到的回复。对于更复杂的聚合策略,您可以自行定义 AggregationStrategy 接口的实施。有关详细信息,请参阅 第 8.5 节 “聚合器”。例如,若要将自定义聚合策略 MyOwnAggregationStrategy 应用到答复消息,您可以按照如下所示定义 Java DSL 路由:

from("direct:a")
    .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
    .to("direct:b");

在 Spring XML 中,您可以将自定义聚合策略指定为 接收者List 标签的属性,如下所示:

<route>
  <from uri="direct:a"/>
  <recipientList strategyRef="myStrategy">
    <header>myHeader</header>
  </recipientList>
  <to uri="direct:b"/>
</route>

<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

使用自定义线程池

可作为 Camel 2.2 提供

这只在使用 并行处理时才需要。默认情况下,Camel 使用有 10 个线程的线程池。请注意,在我们稍后进行线程管理和配置时(在 Camel 2.2 中可能发生)时,这可能会改变。

您像使用自定义聚合策略一样配置它。

使用方法调用作为接收者列表

您可以使用 bean 集成来提供接收者,例如:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");

其中,MessageRouter bean 的定义如下:

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}

bean 作为接收者列表

您可以通过将 @RecipientList 注解添加到返回接收者列表的方法,使 bean 的行为作为接收者列表。例如:

public class MessageRouter {

    @RecipientList
    public String routeTo() {
        String queueList = "activemq:queue:test1,activemq:queue:test2";
        return queueList;
    }
}

在这种情况下,不要在 路由中包含 recipientList DSL 命令。定义路由,如下所示:

from("activemq:queue:test").bean(MessageRouter.class, "routeTo");

使用超时

可作为 Camel 2.5 提供。

如果使用 并行处理,则可以以毫秒为单位配置总 超时值。然后,Camel 将并行处理消息,直到超时达到为止。这可让您在一条信息较慢时继续处理。

在以下示例中,recipientlist 标头的值为 direct: a、direct:b、direct:c,因此该邮件会发送到三个接收方。我们有 250毫秒的超时,这意味着在时间段内只有最后两条消息即可完成。因此,聚合会产生字符串结果 BC

from("direct:start")
    .recipientList(header("recipients"), ",")
    .aggregationStrategy(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;
                }

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
            }
        })
        .parallelProcessing().timeout(250)
    // use end to indicate end of recipientList clause
    .end()
    .to("mock:result");

from("direct:a").delay(500).to("mock:A").setBody(constant("A"));

from("direct:b").to("mock:B").setBody(constant("B"));

from("direct:c").to("mock:C").setBody(constant("C"));
注意

这种 超时 功能也由 splitterreceiver List 支持

默认情况下,如果超时发生 AggregationStrategy 不会被调用。但是,您可以实施专用版本

// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

    /**
     * A timeout occurred
     *
     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
     */
    void timeout(Exchange oldExchange, int index, int total, long timeout);

这允许您在 AggregationStrategy 中处理超时(如果您确实需要)。

timeout 是总计

超时为总计,这意味着在 X 时间后,Camel 将聚合在时间段内完成的消息。剩余部分将被取消。Camel 还将只调用 TimeoutAwareAggregationStrategy 中的 timeout 方法,用于第一个导致超时的索引。

将自定义处理应用到传出消息

recipientList 会向接收者端点发送一条信息前,它会创建一个消息副本,这是原始消息的绝对副本。在应该复制中,原始消息的标头和有效负载仅通过引用来复制。每个新副本不包含这些元素自己的实例。因此,消息的绝对副本会链接,当将其路由到不同的端点时,您无法应用自定义处理。

如果要在副本发送到端点前对每个消息副本执行一些自定义处理,您可以在 recipientList 子句中调用 onPrepare DSL 命令。onPrepare 命令只在消息被放送到其端点 插入自定义处理器。例如,在以下路由中,为每个接收者端点 在消息副本上调用 CustomProc 处理器:

from("direct:start")
  .recipientList().onPrepare(new CustomProc());

onPrepare DSL 命令的常见用例是对消息的部分或所有元素进行深入副本。这允许独立于其他消息副本修改每个消息副本。例如,以下 CustomProc 处理器类执行消息正文的深层副本,其中消息正文假定为 type、Bdy Type,而深度副本则由方法、Bdy Type.deepCopy() 执行。

// Java
import org.apache.camel.*;
...
public class CustomProc implements Processor {

    public void process(Exchange exchange) throws Exception {
        BodyType body = exchange.getIn().getBody(BodyType.class);

        // Make a _deep_ copy of of the body object
        BodyType clone =  BodyType.deepCopy();
        exchange.getIn().setBody(clone);

        // Headers and attachments have already been
        // shallow-copied. If you need deep copies,
        // add some more code here.
    }
}

选项

recipientList DSL 命令支持以下选项:

名称

默认值

描述

delimiter

,

表达式返回的多个端点时使用分隔符。

strategyRef

 

指的是 AggregationStrategy,用于将接收方的回复组合成来自 第 8.3 节 “接收者列表” 的单个传出消息。默认情况下,Camel 将使用最后的回复作为传出消息。

strategyMethodName

 

这个选项可用于明确指定要使用的方法名称,当 OVAs 用作 AggregationStrategy 时。

strategyMethodAllowNull

false

当将 POJOs 用作 AggregationStrategy 时,可以使用这个选项。如果为 false 则不会使用聚合方法,如果没有数据丰富。如果为 true,则使用空值作为 oldExchange,如果没有要增强数据,则使用null 值。

parallelProcessing

false

Camel 2.2: 如果启用然后同时向接收者发送信息。请注意,调用者线程仍会等待所有消息都完全处理,然后再继续。它只发送和处理来自同时发生的收件人的回复。

parallelAggregate

false

如果启用,则 AggregationStrategy 上的聚合方法可以同时调用。请注意,这需要实施 AggregationStrategy 为 thread-safe。默认情况下,此选项为 false,这表示 Camel 会自动同步对聚合方法的调用。然而,在一些用例中,您可以通过将 AggregationStrategy 作为 thread-safe,并将此选项设置为 true 来提高性能。

executorServiceRef

 

Camel 2.2: 请参阅自定义线程池,以用于并行处理。请注意,如果您设定了这个选项,则并行处理会被自动表示,您也不必启用该选项。

stopOnException

false

Camel 2.2: 出现异常时是否立即停止持续处理。如果禁用,则 Camel 会将信息发送到所有收件人,无论它们是否失败。您可在完全控制如何处理它的 AggregationStrategy 类中处理异常。

ignoreInvalidEndpoints

false

Camel 2.3: 如果无法解析端点 uri,它会被忽略。否则,Camel 会抛出一个异常,说明 endpoint uri 无效。

streaming

false

Camel 2.5: 如果启用,Camel 将处理顺序的回复,例如他们返回的顺序。如果禁用,Camel 将按照与指定的表达式相同的顺序进行回复。

timeout

 

Camel 2.5: 设置 millis 中指定的总超时。如果 第 8.3 节 “接收者列表” 没有可以在指定时间段内发送和处理所有回复,则超时触发器和 第 8.3 节 “接收者列表” 会中断并继续。请注意,如果您提供 AggregationStrategy,则会在中断前调用 超时 方法。

onPrepareRef

 

Camel 2.8: 请参阅自定义处理器准备每个接收方的 Exchange 副本。这可让您进行任何自定义逻辑,如 deep-cloning(如果需要)信息有效负载。

shareUnitOfWork

false

Camel 2.8: 是否应共享工作单元。详情请查看 第 8.4 节 “Splitter” 上的相同选项。

cacheSize

0

Camel 2.13.1/2.12.4: 允许配置 ProducerCache 的缓存大小,该缓存制作者在路由 slip 中重复使用。默认情况下,将使用默认的缓存大小,即 0。将值设为 -1 允许将缓存全部关闭。

在 Recipient 列表中使用 Exchange Pattern

默认情况下,Recipient List 使用当前的交换模式。但是,在有些情况下,您可以使用不同的交换模式向接收者发送信息。

例如,您可能有一个作为 InOnly 路由启动的路由。现在,如果您要通过接收者列表使用 InOut Exchange 模式,则需要直接在接收者端点中配置交换模式。

以下示例演示了新文件将作为 InOnly 开头,然后路由到接收者列表的路由。如果要将 InOut 与 ActiveMQ(JMS)端点搭配使用,则需要使用 exchangePattern 等于 InOut 选项指定该端点。但是,响应形式随后将继续路由 JMS 请求或回复,因此响应将作为文件存储在 outbox 目录中。

from("file:inbox")
  // the exchange pattern is InOnly initially when using a file route
  .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut")
  .to("file:outbox");
注意

InOut Exchange 模式必须在超时时间内获得响应。但是,如果响应未接收,则会失败。