1.5. 处理器

概述

要让路由器执行比简单地将消费者端点连接到制作者端点更有趣的事情,您可以将 处理器 添加到路由中。处理器是一个命令,您可以插入到路由规则中,以执行任何通过规则流的消息处理。Apache Camel 提供各种处理器,如 表 1.1 “Apache Camel Processors” 所示。

表 1.1. Apache Camel Processors

Java DSLXML DSL描述

aggregate()

aggregate

第 8.5 节 “聚合器”:创建一个聚合器,将多个传入的交换组合到一个交换中。

aop()

AOP

使用 Aspect Oriented 编程(AOP)在指定的子路由之前和之后工作。

bean(), beanRef()

bean

通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “bean Integration”

choice()

choice

第 8.1 节 “基于内容的路由器”:使用 when 和 other 子句根据交换内容选择特定的子路由。

convertBodyTo()

convertBodyTo

In 消息正文转换为指定的类型。

delay()

delay

第 8.9 节 “Delayer”:将交换的传播延迟到路由后面的部分。

doTry()

doTry

创建用于处理异常的 try/catch 块,使用 doCatch最后执行、和 end 子句。

end()

N/A

结束当前命令块。

enrich(),enrichRef()

增强

第 10.1 节 “内容增强器”:将当前交换与指定 制作者 端点 URI 请求的数据相结合。

filter()

filter

第 8.2 节 “Message Filter”:使用 predicate 表达式来过滤传入的交换。

idempotentConsumer()

idempotentConsumer

第 11.8 节 “幂等的消费者”:实施策略以抑制重复的消息。

inheritErrorHandler()

@inheritErrorHandler

布尔选项可用于禁用特定路由节点上继承的错误处理程序(定义为 Java DSL 中的子词,以及 XML DSL 中的属性)。

inOnly()

仅限

将当前的交换的 MEP 设置为 InOnly (如果没有参数),或者将交换作为 InOnly 发送到指定的端点。

inOut()

inOut

将当前的交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。

loadBalance()

loadBalance

第 8.10 节 “Load Balancer”:在一组端点上实施负载平衡。

log()

log

将消息记录到控制台。

loop()

loop

第 8.16 节 “loop”:重复将每个交换的后端到路由中的后方。

markRollbackOnly()

@markRollbackOnly

(事务) 只标记当前回滚的事务(不会引发异常)。在 XML DSL 中,此选项设置为 回滚 元素上的布尔值属性。请参阅 Apache Karaf 事务指南

markRollbackOnlyLast()

@markRollbackOnlyLast

(事务) 如果之前已与这个线程关联一个或多个事务,然后暂停,这个命令只标记回滚的最新事务(不会引发异常)。在 XML DSL 中,此选项设置为 回滚 元素上的布尔值属性。请参阅 Apache Karaf 事务指南

marshal()

marshal

使用指定数据格式转换为低级或二进制格式,以准备通过特定传输协议发送。

multicast()

multicast

第 8.13 节 “多播”:将当前交换多播到多个目的地,其中每个目标获得自己的交换副本。

onCompletion()

onCompletion

定义在主路由完成后执行的子路由(以 end() 结尾)。另请参阅 第 2.14 节 “OnCompletion”

onException()

onException

定义在 Java DSL 中以 end() 结尾的子路由(在发生指定异常时执行的子路由)。通常在其自己的行中定义(不在路由中)。

pipeline()

pipeline

第 5.4 节 “pipes 和 Filters”:将交换发送到一系列端点,其中一个端点的输出成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”

policy()

policy

将策略应用到当前路由(目前仅用于事务性策略>_<),请参阅 Apache Karaf 事务指南

pollEnrich(),pollEnrichRef()

pollEnrich

第 10.1 节 “内容增强器”:将当前交换与从指定 消费者 端点 URI 轮询的数据合并。

process(),processRef

process

在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节第 III 部分 “高级 Camel 编程”

recipientList()

recipientList

第 8.3 节 “接收者列表”:将交换发送到在运行时计算的接收者列表(例如,基于标头的内容)。

removeHeader()

removeHeader

从交换的 In 消息中删除指定的标头。

removeHeaders()

removeHeaders

从交换的 In 消息中删除与指定模式匹配的标头。这个模式可以具有形式的 prefix\* 替代方案,在这种情况下,它匹配以前缀为为 prefix the- theotherwise 的每个名称,它被解释为正则表达式。

removeProperty()

删除Property

从交换中删除指定的交换属性。

removeProperties()

removeProperties

从交换中删除与指定模式匹配的属性。将以逗号分隔的 1 个或多个字符串列表作为参数。第一个字符串是 模式(请参阅上面的 removeHeaders() )。后续字符串指定例外 - 这些属性保留。

resequence()

重新排序

第 8.6 节 “Resequencer”: 按指定比较主操作对传入的交换进行重新排序。支持 批处理 模式和 流模式

rollback()

rollback

(事务) 只标记当前仅回滚的事务(默认情况下也增加一个异常)。请参阅 Apache Karaf 事务指南

routingSlip()

routingSlip

第 8.7 节 “路由 Slip”:根据从 slip 标头提取的端点 URI 列表,通过动态构建的 Pipeline 路由交换。

sample()

示例

创建一个抽样节流,允许您从路由上的流量中提取交换示例。

setBody()

setBody

设置交换的 消息正文

setExchangePattern()

setExchangePattern

将当前的交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节

setHeader()

setHeader

在交换的 In 消息中设置指定的标头。

setOutHeader()

setOutHeader

在 Exchange's Out 消息中设置指定的标头。

setProperty()

setProperty()

设置指定的交换属性。

sort()

排序

In 消息正文的内容进行排序(其中可以指定自定义比较器)。

split()

split

第 8.4 节 “Splitter”:将当前交换分成一系列交换,其中每个分割交换包含原始消息正文的片段。

stop()

stop

可停止路由当前交换,并将其标记为 completed。

threads()

threads

创建线程池,以并发处理路由中的后一部分。

throttle()

throttle

第 8.8 节 “Throttler”:将流率限制为指定级别(每秒更改)。

throwException()

throwException

引发指定的 Java 异常。

to()

将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”

toF()

N/A

使用字符串格式将交换发送到端点。也就是说,端点 URI 字符串可以在 C printf() 函数的样式中嵌入替换。

transacted()

Transacted

创建包含路由后者一部分的 Spring 事务范围。请参阅 Apache Karaf 事务指南

transform()

转换

第 5.6 节 “message Translator”:将 In 消息标头复制到 Out 消息标头,并将 Out 消息正文设置为指定的值。

unmarshal()

unmarshal

使用指定数据格式将 In 消息正文从低级或二进制格式转换为高级别格式。

validate()

validate

使用 predicate 表达式来测试当前消息是否有效。如果该 predicate 返回 false,则抛出一个 PredicateValidationException 异常。

wireTap()

wireTap

第 12.3 节 “wire Tap”:使用 ExchangePattern.InOnly MEP 将当前交换的副本发送到指定的线 tap URI。

一些示例处理器

要了解如何在路由中使用处理器,请查看以下示例:

Choice

choice() 处理器是一个条件语句,用于将传入的消息路由到替代制作者端点。每个替代制作者端点的前面为一个 when() 方法,该方法使用一个 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理继续进行规则中的下一个 when() 方法。例如,以下 choice() 处理器将传入消息定向到 Target1、Target 2Target3,具体取决于 Predicate1Predicate2 的值:

from("SourceURL")
    .choice()
        .when(Predicate1).to("Target1")
        .when(Predicate2).to("Target2")
        .otherwise().to("Target3");

也可以在 Spring XML 中:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

在 Java DSL 中,有一个特殊情形,您可能需要使用 endChoice() 命令。某些标准 Apache Camel 处理器允许您使用特殊子使用指定额外参数,从而有效地打开额外的嵌套级别,这通常由 end() 命令终止。例如,您可以将负载均衡器子指定为 loadBalance().round SDin().to("mock:foo").to(" mock:bar ").end(),它加载 balances 信息在 模拟:foo 和模拟:bar 端点之间进行负载平衡。但是,如果负载均衡器条款嵌入到所选条件中,需要使用 endChoice() 命令终止子句,如下所示:

from("direct:start")
    .choice()
        .when(bodyAs(String.class).contains("Camel"))
            .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice()
        .otherwise()
            .to("mock:result");

Filter

filter() 处理器可用于防止不间断消息到达制作者端点。它取一个 predicate 参数:如果 predicate 为 true,则允许消息交换给制作者;如果 predicate 为 false,则会阻止消息交换。例如,以下过滤器将阻止消息交换,除非传入消息包含标题为 foo,值设为 bar

from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");

也可以在 Spring XML 中:

<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

Throttler

throttle() 处理器确保制作者端点不会超载。throttler 通过限制每秒可传递的消息数量来实现。如果传入的消息超过指定率,则节流器会在缓冲区中累积过量消息,并将它们更慢地传输到制作者端点。例如,要将吞吐量率限制为每秒 100 个消息,您可以定义以下规则:

from("SourceURL").throttle(100).to("TargetURL");

也可以在 Spring XML 中:

<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttle>
  </route>
</camelContext>

自定义处理器

如果此处描述的标准处理器都不提供您需要的功能,您可以始终定义自己的自定义处理器。要创建自定义处理器,请定义一个实施 org.apache.camel.Processor 接口的类,并覆盖 process() 方法。以下自定义处理器 MyProcessor 会从传入信息中删除名为 foo 的标头:

例 1.3. 实施自定义处理器类

public class MyProcessor implements org.apache.camel.Processor {
public void process(org.apache.camel.Exchange exchange) {
  inMessage = exchange.getIn();
  if (inMessage != null) {
      inMessage.removeHeader("foo");
  }
}
};

要将自定义处理器插入到路由器规则中,调用 process() 方法,它为将处理器插入到规则中提供了通用机制。例如,以下规则调用 例 1.3 “实施自定义处理器类” 中定义的处理器:

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");