第 10 章 消息转换

摘要

消息转换模式描述了如何为各种目的修改消息内容。

10.1. 内容增强

概述

内容增强 模式描述了消息目的地需要更多数据的情况,而不是原始消息中存在的数据。在这种情况下,您可以使用消息转换器、路由逻辑中的任意处理器或内容增强方法从外部资源拉取额外的数据。

图 10.1. 内容增强模式

内容增强模式

增强内容的替代方案

Apache Camel 支持多种增强内容的方法:

  • 在路由逻辑中使用任意处理器的消息转换器
  • enrich () 方法通过将当前交换的副本发送到 制作者 端点,然后使用结果回复中的数据从资源获取额外的数据。增强器创建的交换始终是 InOut 交换。
  • pollEnrich () 方法通过轮询数据的 消费者 端点来获取其他数据。有效地,来自 main 路由和 pollEnrich () 操作中的消费者端点是组合的。也就是说,路由中初始消费者上的传入消息会触发要轮询的消费者上的 pollEnrich () 方法。
注意

enrich ()pollEnrich () 方法支持动态端点 URI。您可以通过指定一个表达式来计算 URI,该表达式可让您从当前交换中获取值。例如,您可以使用从数据交换计算的名称来轮询文件。在 Camel 2.16 中引入了此行为。这个更改会破坏 XML DSL,并可让您轻松地迁移。Java DSL 保持向后兼容。

使用消息转换器和处理器增强内容

Camel 提供了 流畅的构建器,以使用提供智能完成且安全重构安全的类型安全 IDE 创建路由和调解规则。当您测试分布式系统时,需要存根某些外部系统,以便测试系统的其他部分,直到特定的系统可用或写出。其中一种实现方式是使用某种 模板 系统通过生成具有 mostly-static 正文的动态消息来生成对请求的响应。使用模板的另一种方法是使用来自一个目的地的消息,使用 VelocityXQuery 等内容进行转换,然后将其发送到另一个目的地。以下示例显示了 InOnly (one way)信息:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

假设您想要使用 InOut (request-reply)消息传递来处理 ActiveMQ 上 My.Queue 队列上的请求。您需要一个模板生成的响应,该响应发送到 JMSReplyTo 目的地。以下示例演示了如何进行此操作:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

以下简单示例演示了如何使用 DSL 转换消息正文:

from("direct:start").setBody(body().append(" World!")).to("mock:result");

以下示例使用显式 Java 代码添加处理器:

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

下一个示例使用 bean 集成来启用任何 bean 作为转换器:

from("activemq:My.Queue").
  beanRef("myBeanName", "myMethodName").
  to("activemq:Another.Queue");

以下示例显示了 Spring XML 实现:

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>/>

使用 enrich ()方法增强内容

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...

内容增强(增强 ) 从资源端点 检索其他数据,以增强传入的消息(包含在组织 交换中)。聚合策略将原始交换与资源交换相结合。AggregationStrategy.aggregate (Exchange, Exchange) 方法的第一个参数对应于原始交换,第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。以下是实施您自己的聚合策略类的示例模板:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

使用此模板时,原始交换可以具有任何交换模式。增强器所创建的资源交换始终是一个 InOut 交换。

Spring XML enrich 示例

前面的示例也可以在 Spring XML 中实施:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

增强内容时的默认聚合策略

聚合策略是可选的。如果没有提供它,Apache Camel 默认使用从资源获取的正文。例如:

from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");

在前面的路由中,发送到 direct:result 端点的消息包含 direct:resource 的输出,因为本示例不使用任何自定义聚合。

在 XML DSL 中,仅省略 policyRef 属性,如下所示:

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

丰富的() 方法支持的选项

enrich DSL 命令支持以下选项:

Name

默认值

描述

expression

None

从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于将外部服务的 URI 配置为增强它。您可以使用 简单表达式语言Constant 表达式语言或任何其他语言,这些语言可以从当前交换中的值动态计算 URI。

uri

 

这些选项已被删除。指定 expression 选项。在 Camel 2.15 及更早版本中,需要 uri 选项或 ref 选项。每个选项都指定外部服务的端点 URI,以便从.

Ref

 

指的是外部服务的端点,以便从.您必须使用 uriref

strategyRef

 

指的是 AggregationStrategy,用于将来自外部服务的回复合并到单个传出消息。默认情况下,Camel 使用来自外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

当使用 POJO 作为 AggregationStrategy 时,指定这个选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认的行为是,如果没有数据来增强,则不会使用聚合方法。如果此选项为 true,则在没有数据功能时将 null 值用作 oldExchange,并且您使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式。

aggregateOnException

false

默认的行为是,如果在尝试从资源检索数据时抛出异常,则不会使用 聚合方法。将这个选项设置为 true 可让最终用户控制聚合方法中存在异常 时要执行的操作。例如,可以阻止异常或设置自定义消息正文

shareUntOfWork

false

从 Camel 2.16 开始,默认行为是,丰富的操作不会在父交换和资源交换之间共享工作单元。这意味着资源交换有自己的独立工作单元。如需更多信息,请参阅 Splitter 模式的文档。

cacheSize

1000

从 Camel 2.16 开始,指定这个选项来为 ProducerCache 配置缓存大小,这会缓存生成者以便在功能操作中重复使用。要关闭此缓存,请将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

从 Camel 2.16 开始,这个选项指示是否忽略无法解析的端点 URI。默认行为是 Camel 会抛出一个标识无效端点 URI 的异常。

在使用 enrich ()方法时指定聚合策略

enrich () 方法从资源端点检索其他数据,以增强传入消息,该消息包含在原始交换中。您可以使用聚合策略来组合原始交换和资源交换。AggregationStrategy.aggregate (Exchange, Exchange) 方法的第一个参数对应于原始交换。第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。例如:

AggregationStrategy aggregationStrategy = ...

   from("direct:start")
   .enrich("direct:resource", aggregationStrategy)
   .to("direct:result");

   from("direct:resource")
...

以下代码是用于实施聚合策略的模板。在使用此模板的实施中,原始交换可以是任何消息交换模式。增强器所创建的资源交换始终是一个 InOut 消息交换模式。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

以下示例显示了使用 Spring XML DSL 来实现聚合策略:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

使用带有 enrich ()的动态 URI

从 Camel 2.16 开始,enrich ()pollEnrich () 方法支持使用根据当前交换中的信息计算的动态 URI。例如,要增强 HTTP 端点,其中带有 orderId 键的标头被用作 HTTP URL 的内容路径的一部分,您可以执行以下操作:

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

以下是 XML DSL 中的示例:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

使用 pollEnrich ()方法增强内容

pollEnrich 命令将资源端点视为 消费者。它 轮询 端点,而不是向资源端点发送交换。默认情况下,如果资源端点中没有交换,则轮询会立即返回。例如,以下路由读取从传入 JMS 消息的标头中提取名称的文件:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

您可以限制等待文件就绪的时间。以下示例显示了最大等待 20 秒:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

您还可以为 pollEnrich () 指定聚合策略,例如:

   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)

pollEnrich () 方法支持配置了 consumer.bridgeErrorHandler=true 的消费者。这可让轮询传播到路由错误处理程序的任何异常,例如重试轮询。

注意

consumer.bridgeErrorHandler=true 的支持是 Camel 2.18 中的新功能。Camel 2.17 不支持此行为。

如果在收到交换前轮询超时,传递给聚合策略的 aggregate () 方法的资源交换可能为 null

轮询由 pollEnrich ()使用的方法。

pollEnrich () 方法通过调用以下轮询方法之一来轮询消费者端点:

  • receiveNoWait ()(这是默认值。)
  • receive()
  • receive (long timeout)

pollEnrich () 命令的 timeout 参数(以毫秒为单位指定)决定要调用的方法,如下所示:

  • 当超时为 0 或未指定时,pollEnrich () 调用 receiveNoWait
  • 当超时为负时,pollEnrich () 调用 会收到
  • 否则,pollEnrich () 调用 receive (timeout)

如果没有数据,则聚合策略中的 newExchange 为 null。

使用 pollEnrich ()方法的示例

以下示例显示,通过从 inbox/data.txt 文件中载入内容来增强消息:

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

以下是 XML DSL 中的示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

如果指定的文件不存在,则消息为空。您可以指定等待的超时时间(可能为ever),直到文件存在或等待到特定时间长度。在以下示例中,命令会等待不超过 5 秒:

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="5000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

使用带有 pollEnrich ()的动态 URI.

从 Camel 2.16 开始,enrich ()pollEnrich () 方法支持使用根据当前交换中的信息计算的动态 URI。例如,若要从使用标头指示 SEDA 队列名称的端点轮询功能,您可以执行以下操作:

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

以下是 XML DSL 中的示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

pollEnrich ()方法支持的选项

pollEnrich DSL 命令支持以下选项:

Name

默认值

描述

expression

None

从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于将外部服务的 URI 配置为增强它。您可以使用 简单表达式语言Constant 表达式语言或任何其他语言,这些语言可以从当前交换中的值动态计算 URI。

uri

 

这些选项已被删除。指定 expression 选项。在 Camel 2.15 及更早版本中,需要 uri 选项或 ref 选项。每个选项都指定外部服务的端点 URI,以便从.

Ref

 

指的是外部服务的端点,以便从.您必须使用 uriref

strategyRef

 

指的是 AggregationStrategy,用于将来自外部服务的回复合并到单个传出消息。默认情况下,Camel 使用来自外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

当使用 POJO 作为 AggregationStrategy 时,指定这个选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认的行为是,如果没有数据来增强,则不会使用聚合方法。如果此选项为 true,则在没有数据功能时将 null 值用作 oldExchange,并且您使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式。

timeout

-1

从外部服务轮询时等待响应的最大时间长度(以毫秒为单位)。默认的行为是 pollEnrich () 方法调用 receive () 方法。因为 receive () 可以阻止直到有消息可用,所以建议始终指定超时。

aggregateOnException

false

默认的行为是,如果在尝试从资源检索数据时抛出异常,则不会使用 聚合方法。将这个选项设置为 true 可让最终用户控制聚合方法中存在异常 时要执行的操作。例如,可以阻止异常或设置自定义消息正文

cacheSize

1000

指定这个选项来为 ConsumerCache 配置缓存大小,这会缓存消费者在 pollEnrich () 操作中重复使用。要关闭此缓存,请将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

指明是否忽略无法解析的端点 URI。默认行为是 Camel 会抛出一个标识无效端点 URI 的异常。