8.4. Splitter

概述

splitter 是将传入的消息拆分为一系列传出消息的路由器。每个传出消息均包含一条原始消息。在 Apache Camel 中,在 图 8.4 “Splitter Pattern” 中显示的 splitter 模式通过 split() Java DSL 命令实现。

图 8.4. Splitter Pattern

Splitter pattern

Apache Camel splitter 实际上支持两种模式,如下所示:

  • 简单的 splitter NETWORK-puppet 实施其本身的分割器模式。
  • Splitter/aggregator fsanitize->_<com 组合了带聚合器模式的 splitter 模式,以便在处理后对消息的片段重新组合。

在分割器将原始消息分成几部分之前,它会制作原始消息的绝对副本。在应该复制中,原始消息的标头和有效负载仅复制为参考。虽然 splitter 本身不会将生成的消息部分路由到不同的端点,但拆分消息的部分可能会处于二级路由下。

由于消息部分是可浏览的副本,它们仍然与原始消息相关联。因此,无法独立修改它们。如果要在将自定义逻辑路由到端点集之前,将自定义逻辑应用到消息部分的不同副本,您必须在 splitter 子句中使用 onPrepareRef DSL 选项,以制作原始消息的深度副本。有关使用选项的详情请参考 “选项”一节

Java DSL 示例

以下示例定义了一个来自 seda:aseda:b 的路由,它将传入的消息一行转换为单独的传出消息:

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a")
          .split(bodyAs(String.class).tokenize("\n"))
          .to("seda:b");
    }
};

splitter 可以使用任何表达式语言,因此您可以使用任何受支持的脚本语言(如 XPath、XQuery 或 SQL)分割消息(请参阅 第 II 部分 “路由表达式和指定语言”)。以下示例从传入信息中提取 元素,并将它们插入到单独的传出消息中:

from("activemq:my.queue")
  .split(xpath("//foo/bar"))
  .to("file://some/directory")

XML 配置示例

以下示例演示了如何使用 XPath 脚本语言在 XML 中配置分割路由:

<camelContext id="buildSplitter" xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="seda:a"/>
      <split>
        <xpath>//foo/bar</xpath>
        <to uri="seda:b"/>
      </split>
    </route>
</camelContext>

您可以使用 XML DSL 中的令牌化表达式来通过令牌分割样或标头,其中使用 tokenize 元素定义令牌化表达式。在以下示例中,消息正文使用 \n 分隔符字符进行令牌。要使用正则表达式模式,请在 tokenize 元素中设置 regex=true

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <split>
            <tokenize token="\n"/>
            <to uri="mock:result"/>
        </split>
    </route>
    </camelContext>

分割成一组行

要将大型文件分成 1000 行块,您可以在 Java DSL 中按照如下所示定义分割路由:

from("file:inbox")
    .split().tokenize("\n", 1000).streaming()
       .to("activemq:queue:order");

用来 令牌的 第二个参数指定应分组到一个块的行数。streaming() 子句会指示 splitter 不会一次读取整个文件(如果文件较大,则性能更高)。

同一路由可以在 XML DSL 中定义,如下所示:

<route>
  <from uri="file:inbox"/>
  <split streaming="true">
    <tokenize token="\n" group="1000"/>
    <to uri="activemq:queue:order"/>
  </split>
</route>

使用 group 选项时的输出始终为 java.lang.String 类型。

跳过第一个项目

要跳过消息中的第一个项目,您可以使用 skipFirst 选项。

在 Java DSL 中,在 tokenize 参数中使用第三个选项 true

from("direct:start")
 // split by new line and group by 3, and skip the very first element
      .split().tokenize("\n", 3, true).streaming()
         .to("mock:group");

同一路由可以在 XML DSL 中定义,如下所示:

<route>
  <from uri="file:inbox"/>
    <split streaming="true">
    <tokenize token="\n" group="1000" skipFirst="true" />
    <to uri="activemq:queue:order"/>
  </split>
</route>

Splitter reply

如果输入 splitter 的交换具有 InOut 消息更改模式(即预期为回复),则分割者将原始输入消息的副本作为 Out message 插槽中的回复消息。您可以通过实施自己的 聚合策略来覆盖此默认行为

并行执行

如果要并行执行生成的消息片段,您可以启用 parallel processing 选项,它会实例化线程池来处理消息片段。例如:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar");
from("activemq:my.queue").split(xPathBuilder).parallelProcessing().to("activemq:my.parts");

您可以自定义并行分割器中使用的底层 ThreadPoolExecutor。例如,您可以在 Java DSL 中指定自定义 executor,如下所示:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar");
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
from("activemq:my.queue")
  .split(xPathBuilder)
  .parallelProcessing()
  .executorService(threadPoolExecutor)
  .to("activemq:my.parts");

您可以在 XML DSL 中指定自定义执行器,如下所示:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:parallel-custom-pool"/>
    <split executorServiceRef="threadPoolExecutor">
      <xpath>/invoice/lineItems</xpath>
      <to uri="mock:result"/>
    </split>
  </route>
</camelContext>

<bean id="threadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
  <constructor-arg index="0" value="8"/>
  <constructor-arg index="1" value="16"/>
  <constructor-arg index="2" value="0"/>
  <constructor-arg index="3" value="MILLISECONDS"/>
  <constructor-arg index="4"><bean class="java.util.concurrent.LinkedBlockingQueue"/></constructor-arg>
</bean>

使用 bean 执行分割

因为 splitter 可以使用任何 表达式进行拆分,您可以通过调用 method() 表达式来使用 bean 执行拆分。bean 应返回可迭代的值,如 java.util.Collectionjava.util.Iterator 或一个数组。

以下路由定义了一个 method() 表达式,它调用 mySplitterBean 实例上的方法:

from("direct:body")
        // here we use a POJO bean mySplitterBean to do the split of the payload
        .split()
        .method("mySplitterBean", "splitBody")
        .to("mock:result");
from("direct:message")
        // here we use a POJO bean mySplitterBean to do the split of the message
        // with a certain header value
        .split()
        .method("mySplitterBean", "splitMessage")
        .to("mock:result");

其中 mySplitterBeanMySplitterBean 类的实例,其定义如下:

public class MySplitterBean {

    /**
     * The split body method returns something that is iteratable such as a java.util.List.
     *
     * @param body the payload of the incoming message
     * @return a list containing each part split
     */
    public List<String> splitBody(String body) {
        // since this is based on an unit test you can of couse
        // use different logic for splitting as {router} have out
        // of the box support for splitting a String based on comma
        // but this is for show and tell, since this is java code
        // you have the full power how you like to split your messages
        List<String> answer = new ArrayList<String>();
        String[] parts = body.split(",");
        for (String part : parts) {
            answer.add(part);
        }
        return answer;
    }

    /**
     * The split message method returns something that is iteratable such as a java.util.List.
     *
     * @param header the header of the incoming message with the name user
     * @param body the payload of the incoming message
     * @return a list containing each part split
     */
    public List<Message> splitMessage(@Header(value = "user") String header, @Body String body) {
        // we can leverage the Parameter Binding Annotations
        // http://camel.apache.org/parameter-binding-annotations.html
        // to access the message header and body at same time,
        // then create the message that we want, splitter will
        // take care rest of them.
        // *NOTE* this feature requires {router} version >= 1.6.1
        List<Message> answer = new ArrayList<Message>();
        String[] parts = header.split(",");
        for (String part : parts) {
            DefaultMessage message = new DefaultMessage();
            message.setHeader("user", part);
            message.setBody(body);
            answer.add(message);
        }
        return answer;
    }
}

您可以使用带有 Splitter EIP 的 BeanIOSplitter 对象来分割大型有效负载,以避免将整个内容读取到内存中。以下示例演示了如何使用从 classpath 加载的映射文件设置 BeanIOSplitter 对象:

注意

BeanIOSplitter 类是 Camel 2.18 中的新类。Camel 2.17 中不提供它。

BeanIOSplitter splitter = new BeanIOSplitter();
   splitter.setMapping("org/apache/camel/dataformat/beanio/mappings.xml");
   splitter.setStreamName("employeeFile");

    // Following is a route that uses the beanio data format to format CSV data
    // in Java objects:
    from("direct:unmarshal")
        // Here the message body is split to obtain a message for each row:
         .split(splitter).streaming()
         .to("log:line")
         .to("mock:beanio-unmarshal");

以下示例添加了一个错误处理器:

BeanIOSplitter splitter = new BeanIOSplitter();
   splitter.setMapping("org/apache/camel/dataformat/beanio/mappings.xml");
   splitter.setStreamName("employeeFile");
   splitter.setBeanReaderErrorHandlerType(MyErrorHandler.class);
   from("direct:unmarshal")
      .split(splitter).streaming()
      .to("log:line")
      .to("mock:beanio-unmarshal");

Exchange 属性

在每个分割交换上设置以下属性:

headertypedescription

CamelSplitIndex

int

Apache Camel 2.0:为每个被分割交换的增加的分割计数器。计数器从 0 开始。

CamelSplitSize

int

Apache Camel 2.0:被分割的 Exchanges 总数。此标头不适用于基于流的拆分。

CamelSplitComplete

布尔值

Apache Camel 2.4:是否是最后的 Exchange。

Splitter/aggregator 模式

在处理各个组件完成后,消息组件的常见模式会聚合到单个交换中。为了支持这个模式,split() DSL 命令可让您提供 AggregationStrategy 对象作为第二个参数。

Java DSL 示例

以下示例演示了如何在处理完所有消息片段后使用自定义聚合策略重新组合分割消息:

from("direct:start")
    .split(body().tokenize("@"), new MyOrderStrategy())
        // each split message is then send to this bean where we can process it
        .to("bean:MyOrderService?method=handleOrder")
        // this is important to end the splitter route as we do not want to do more routing
        // on each split message
    .end()
    // after we have split and handled each message we want to send a single combined
    // response back to the original caller, so we let this bean build it for us
    // this bean will receive the result of the aggregate strategy: MyOrderStrategy
    .to("bean:MyOrderService?method=buildCombinedResponse")

AggregationStrategy 实施

前面路由中使用的自定义聚合策略 MyOrderStrategy 被实施,如下所示:

/**
 * This is our own order aggregation strategy where we can control
 * how each split message should be combined. As we do not want to
 * lose any message, we copy from the new to the old to preserve the
 * order lines as long we process them
 */
public static class MyOrderStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // put order together in old exchange by adding the order from new exchange

        if (oldExchange == null) {
            // the first time we aggregate we only have the new exchange,
            // so we just return it
            return newExchange;
        }

        String orders = oldExchange.getIn().getBody(String.class);
        String newLine = newExchange.getIn().getBody(String.class);

        LOG.debug("Aggregate old orders: " + orders);
        LOG.debug("Aggregate new order: " + newLine);

        // put orders together separating by semi colon
        orders = orders + ";" + newLine;
        // put combined order back on old to preserve it
        oldExchange.getIn().setBody(orders);

        // return old as this is the one that has all the orders gathered until now
        return oldExchange;
    }
}

基于流的处理

启用并行处理后,理论上可以在之前的某一部分之前为聚合做好准备。换句话说,消息片段可能会在聚合器中去除顺序。默认情况下,不会出现此情况,因为分离的实现会在将消息片段传递到聚合器之前将其重新整理到其原始顺序中。

如果您希望在消息发布就绪后立即聚合消息片段(可能使用顺序),您可以启用 streaming 选项,如下所示:

from("direct:streaming")
  .split(body().tokenize(","), new MyOrderStrategy())
    .parallelProcessing()
    .streaming()
    .to("activemq:my.parts")
  .end()
  .to("activemq:all.parts");

您还可以提供自定义迭代程序以用于流,如下所示:

// Java
import static org.apache.camel.builder.ExpressionBuilder.beanExpression;
...
from("direct:streaming")
     .split(beanExpression(new MyCustomIteratorFactory(),  "iterator"))
     .streaming().to("activemq:my.parts")
streaming 和 XPath

您不能与 XPath 一起使用 streaming 模式。XPath 需要内存中的完整 DOM XML 文档。

使用 XML 的流处理

如果传入的消息是一个非常大的 XML 文件,您可以在流传输模式中使用 tokenizeXML 子命令处理消息。

例如,如果一个包含一系列 顺序 元素的大型 XML 文件,您可以使用类似如下的路由将该文件分成 顺序 元素:

from("file:inbox")
  .split().tokenizeXML("order").streaming()
  .to("activemq:queue:order");

您可以在 XML 中进行同样的操作,方法是定义如下路由:

<route>
  <from uri="file:inbox"/>
  <split streaming="true">
    <tokenize token="order" xml="true"/>
    <to uri="activemq:queue:order"/>
  </split>
</route>

通常情况下,您需要访问在令牌元素的某一部分(原来)元素中定义的命名空间。您可以通过指定您要从哪个元素继承命名空间定义,将命名空间定义从其中一个级元素复制到 token 元素。

在 Java DSL 中,您要将 ancestor 元素指定为 tokenizeXML 的第二个参数。例如,从包含的 order 元素中继承 命名空间定义

from("file:inbox")
  .split().tokenizeXML("order", "orders").streaming()
  .to("activemq:queue:order");

在 XML DSL 中,您可以使用 inheritNamespaceTagName 属性指定 ancestor 元素。例如:

<route>
  <from uri="file:inbox"/>
  <split streaming="true">
    <tokenize token="order"
              xml="true"
              inheritNamespaceTagName="orders"/>
    <to uri="activemq:queue:order"/>
  </split>
</route>

选项

split DSL 命令支持以下选项:

名称

默认值

描述

strategyRef

 

指的是 AggregationStrategy,用于将子消息中的回复组合成来自 第 8.4 节 “Splitter” 的单个传出消息。有关默认使用的内容 ,请参阅标题为下面的 splitter 返回 的内容。

strategyMethodName

 

这个选项可用于明确指定要使用的方法名称,当 OVAs 用作 AggregationStrategy 时。

strategyMethodAllowNull

false

当将 POJOs 用作 AggregationStrategy 时,可以使用这个选项。如果为 false 则不会使用聚合方法,如果没有数据丰富。如果为 true,则使用空值作为 oldExchange,如果没有要增强数据,则使用null 值。

parallelProcessing

false

如果启用,则同时处理子消息。请注意,调用者线程仍会等待所有子消息已完全处理,然后再继续处理。

parallelAggregate

false

如果启用,则 AggregationStrategy 上的聚合方法可以同时调用。请注意,这需要实施 AggregationStrategy 为 thread-safe。默认情况下,此选项为 false,这表示 Camel 会自动同步对聚合方法的调用。然而,在一些用例中,您可以通过将 AggregationStrategy 作为 thread-safe,并将此选项设置为 true 来提高性能。

executorServiceRef

 

指的是用于并行处理的自定义线程池。请注意,如果您设定了这个选项,则并行处理会被自动表示,您也不必启用该选项。

stopOnException

false

Camel 2.2: 出现异常时是否立即停止持续处理。如果禁用,则 Camel 会继续分割并处理子消息,无论它们之一是否失败。您可在完全控制如何处理它的 AggregationStrategy 类中处理异常。

streaming

false

如果启用,Camel 将以流方式分割,这意味着它将以块的形式分割输入消息。这可减少内存开销。例如,如果您分离了建议启用流的大量消息。如果启用了流处理,则子消息回复将按其返回的顺序聚合。如果禁用,Camel 将按照子消息的回复处理与被分割的顺序相同。

timeout

 

Camel 2.5: 设置 millis 中指定的总超时。如果 第 8.3 节 “接收者列表” 无法分割并处理给定时间段内的所有回复,则超时触发器和 第 8.4 节 “Splitter” 会中断并继续。请注意,如果您提供 AggregationStrategy,则会在中断前调用 超时 方法。

onPrepareRef

 

Camel 2.8: 请参阅自定义处理器,在处理前准备 Exchange 的子消息。这可让您进行任何自定义逻辑,如 deep-cloning(如果需要)信息有效负载。

shareUnitOfWork

false

Camel 2.8: 是否应共享工作单元。详情请查看以下信息。