1.5. 处理器

概述

要让路由器做比将消费者端点连接到制作者端点相比,您可以向路由添加 处理器 更为有趣。处理器是一个命令,您可以插入到路由规则中,以执行通过该规则执行流的任意处理。Apache Camel 提供各种不同处理器,如 表 1.1 “Apache Camel Processors” 所示。

表 1.1. Apache Camel Processors

Java DSLXML DSL描述

aggregate()

aggregate

第 8.5 节 “聚合器”: 创建一个聚合器,它将多个传入交换组合成一个交换。

aop()

AOP

使用 Aspect Oriented Programming (AOP)在指定子路由之前和之后工作。

bean (), beanRef ()

bean

通过调用 Java 对象(或 bean)的方法来处理当前的交换。请参阅 第 2.4 节 “Bean 集成”

choice()

choice

第 8.1 节 “基于内容的路由器”:使用 when 和 otherwise 子句,根据交换内容 选择特定的 子路由。

convertBodyTo()

convertBodyTo

In 消息正文转换为指定的类型。

delay()

delay

第 8.9 节 “Delayer”: 将交换的传播延迟到路由的后部分。

doTry()

doTry

创建用于处理异常的尝试/分散块,使用 doCatchdo lastly、和 end 子句。

end()

N/A

结束当前命令块。

enrich(),enrichRef()

enrich

第 10.1 节 “内容增强”:将当前的交换与指定 制作者 端点 URI 请求的数据合并。

filter()

filter

第 8.2 节 “消息过滤器”: 使用 predicate 表达式来过滤传入的交换。

idempotentConsumer()

idempotentConsumer

第 11.8 节 “幂等的消费者”:实施策略以抑制重复消息。

inheritErrorHandler()

@inheritErrorHandler

布尔值选项,可用于在特定路由节点上禁用继承的错误处理程序(定义为 Java DSL 中的子使用和 XML DSL 中的属性)。

inOnly()

inOnly

将当前交换的 MEP 设置为 仅限 (如果没有参数),或者将交换作为 仅限 给指定的端点。

inOut()

inOut

将当前交换的 MEP 设置为 InOut (如果没有参数),或者将交换作为 InOut 发送到指定的端点。

loadBalance()

loadBalance

第 8.10 节 “Load Balancer”:对端点集合实施负载平衡。

log()

log

将消息记录到控制台。

loop()

loop

第 8.16 节 “循环”:重复将每个交换重新放置到路由的后部分。

markRollbackOnly()

@markRollbackOnly

(事务) 标记仅回滚的当前事务(不会引发异常)。在 XML DSL 中,此选项被设置为 回滚 元素上的布尔值属性。请参阅 Apache Karaf 事务指南

markRollbackOnlyLast()

@markRollbackOnlyLast

(事务) 如果之前已与该线程关联了一个或多个事务,然后暂停,这个命令会标记仅回滚的最新事务(不会引发异常)。在 XML DSL 中,此选项被设置为 回滚 元素上的布尔值属性。请参阅 Apache Karaf 事务指南

marshal()

marshal

使用指定的数据格式转换为低级或二进制格式,以准备通过特定的传输协议发送。

multicast()

multicast

第 8.13 节 “多播”:将当前交换到多个目的地,每个目的地获得自己的交换副本。

onCompletion()

onCompletion

定义一个子路由(以 Java DSL 结尾 )在主路由完成后执行该子路由。另请参阅 第 2.14 节 “OnCompletion”

onException()

onException

定义 Java DSL 中的 end () 终止的子路由(通过 Java DSL 结尾),每当指定的异常发生时执行。通常在其自己的行中定义(不在路由中)。

pipeline()

pipeline

第 5.4 节 “管道和过滤器”: 将交换发送到一系列端点,其中一个端点的输出将成为下一个端点的输入。另请参阅 第 2.1 节 “Pipeline 处理”

policy()

policy

将策略应用到当前路由(目前仅用于事务策略事项处理器)中,请参阅 Apache Karaf 事务指南

pollEnrich(),pollEnrichRef()

pollEnrich

第 10.1 节 “内容增强”: 将当前交换与来自指定的 消费者 端点 URI 的数据进行合并。

process(),processRef

process

在当前交换上执行自定义处理器。请参阅 “自定义处理器”一节第 III 部分 “高级 Camel 编程”

recipientList()

recipientList

第 8.3 节 “接收者列表”: 将交换发送到运行时计算的接收者列表(例如,基于标头的内容)。

removeHeader()

removeHeader

从交换的 In 消息中删除指定的标头。

removeHeaders()

removeHeaders

从交换的 In 消息中删除与指定模式匹配的标头。这个模式可以采用 前缀\*

removeProperty()

removeProperty

从交换删除指定的交换属性。

removeProperties()

removeProperties

从交换中删除与指定模式匹配的属性。以逗号分隔的 1 或多个字符串列表作为参数。第一个字符串是模式(请参阅上面的 删除Headers () )。后续字符串指定例外 - 这些属性保留。

resequence()

resequence

第 8.6 节 “Resequencer”: 根据指定比较器操作对传入交换进行重新排序。支持 批处理 模式和 流模式

rollback()

rollback

(事务) 标记仅进行回滚的当前事务(此外,默认情况下也会增加异常)。请参阅 Apache Karaf 事务指南

routingSlip()

routingSlip

第 8.7 节 “路由 Slip”:根据从 slip 标头中提取的端点 URI 列表,通过构建的管道路由交换。

sample()

sample

创建一个抽样限制,允许您从路由上的流量中提取交换示例。

setBody()

setBody

设置交换的 In 消息的消息正文。

setExchangePattern()

setExchangePattern

将当前交换的 MEP 设置为指定的值。请参阅 “消息交换模式”一节

setHeader()

setHeader

在交换的 In 消息中设置指定的标头。

setOutHeader()

setOutHeader

设置交换的 Out 消息中指定的标头。

setProperty()

setProperty()

设置指定的交换属性。

sort()

排序

In message body 的内容进行排序(其中可以选择性地指定自定义比较器)。

split()

split

第 8.4 节 “Splitter”:将当前交换分成一系列交换,每个交换都包含原始消息正文的片段。

stop()

stop

停止路由当前交换并将其标记为已完成。

threads()

threads

创建一个线程池,用于对路由的后一部分进行并发处理。

throttle()

throttle

第 8.8 节 “Throttler”: 将流率限制为指定级别(每秒交换数)。

throwException()

throwException

抛出指定的 Java 异常.

to()

将交换发送到一个或多个端点。请参阅 第 2.1 节 “Pipeline 处理”

toF()

N/A

使用字符串格式将交换发送到端点。也就是说,端点 URI 字符串可以嵌入 C printf () 函数方式中的替换。

transacted()

transacted

创建一个 Spring 事务范围,其中包含该路由的后一部分。请参阅 Apache Karaf 事务指南

transform()

翻译

第 5.6 节 “消息转换器”: 将 In message 标头复制到 Out message 标头,并将 Out 消息正文设置为指定的值。

unmarshal()

unmarshal

使用指定的数据格式将 In 消息正文从低级或二进制格式转换为高级别的格式。

validate()

validate

取一个 predicate 表达式来测试当前消息是否有效。如果 predicate 返回 false,则引发 PredicateValidationException 异常。

wireTap()

wireTap

第 12.3 节 “wire Tap”: 使用 ExchangePattern.InOnly MEP,将当前交换的副本发送到指定的有线 tap URI。

有些示例处理器

要了解如何在路由中使用处理器,请查看以下示例:

选择

choice () 处理器是一个条件语句,用于将传入消息路由到备选制作者端点。每个替代制作者端点的前面是一个 when () 方法,该方法采用 predicate 参数。如果 predicate 为 true,则会选择以下目标,否则处理规则中的下一个 when () 方法。例如,以下 choice () 处理器将传入的信息定向到 Target1、Target 2Target3,具体取决于 Predicate1Predicate2 的值:

from("SourceURL")
    .choice()
        .when(Predicate1).to("Target1")
        .when(Predicate2).to("Target2")
        .otherwise().to("Target3");

在 Spring XML 中,或等效于 Spring XML 中:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

在 Java DSL 中,有一个特殊的情形,您可能需要使用 endChoice () 命令。某些标准 Apache Camel 处理器允许您使用特殊子使用指定额外参数,从而有效地打开额外的嵌套级别,该嵌套通常由 end () 命令终止。例如,您可以将负载均衡器子指定为 loadBalance ().roundRobin ().to ("mock:foo").to ("mock:bar").end (),用于平衡 mock:foomock:bar 间的消息。但是,如果负载均衡器子嵌入在选择条件中,则需要使用 endChoice () 命令终止 子句,如下所示:

from("direct:start")
    .choice()
        .when(bodyAs(String.class).contains("Camel"))
            .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice()
        .otherwise()
            .to("mock:result");

Filter

可以使用 filter () 处理器来防止不间断消息到达制作者端点。它采用单个 predicate 参数:如果 predicate 为 true,则允许消息交换到制作者;如果 predicate 为 false,则消息交换会被阻断。例如,以下过滤器会阻断一个消息交换,除非传入的消息包含一个标头 foo,其值等于 bar

from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");

在 Spring XML 中,或等效于 Spring XML 中:

<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

Throttler

throttle () 处理器确保制作者端点不会过载。throttler 的工作原理是限制每秒可传递的消息数量。如果传入的消息超过指定率,则节流者会累积缓冲区中过量消息,并将它们传输速度会更慢于制作者端点。例如,要将吞吐量率限制为每秒 100 个信息,您可以定义以下规则:

from("SourceURL").throttle(100).to("TargetURL");

在 Spring XML 中,或等效于 Spring XML 中:

<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttle>
  </route>
</camelContext>

自定义处理器

如果此处描述的标准处理器不提供您需要的功能,您总是可以自行定义自定义处理器。要创建自定义处理器,可定义一个实施 org.apache.camel.Processor 接口的类,并覆盖 process () 方法。以下自定义处理器 MyProcessor 会从传入信息中删除名为 foo 的标头:

例 1.3. 实施自定义处理器类

public class MyProcessor implements org.apache.camel.Processor {
public void process(org.apache.camel.Exchange exchange) {
  inMessage = exchange.getIn();
  if (inMessage != null) {
      inMessage.removeHeader("foo");
  }
}
};

要将自定义处理器插入到路由器规则中,调用 process () 方法,该方法为将处理器插入到规则的通用机制。例如,以下规则调用在 例 1.3 “实施自定义处理器类” 中定义的处理器:

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");