第 10 章 消息转换

摘要

消息转换模式描述了如何为各种目的修改消息的内容。

10.1. 内容增强器

概述

内容增强器 模式描述了消息目的地需要超过原始消息中的数据的场景。在这种情况下,您要使用消息转换器、路由逻辑中的任意处理器,或者内容丰富的方法从外部资源拉取到额外数据。

图 10.1. 内容增强模式

内容增强器模式

丰富内容的替代方案

Apache Camel 支持多种方式丰富内容:

  • 带有路由逻辑中任意处理器的消息转换器
  • enrich() 方法通过向 制作者 端点发送当前交换的副本来获取来自资源的额外数据,然后在生成的回复中使用数据。由增强器创建的交换始终是一个 InOut 交换。
  • pollEnrich() 方法通过轮询 消费者 端点来获取额外的数据。实际上,来自主路由的消费者端点和 pollEnrich() 操作中的使用者端点是结合的。也就是说,路由初始消费者上的传入消息会触发用户轮询的 pollEnrich() 方法。
注意

enrich()pollEnrich() 方法支持动态端点 URI。您可以通过指定一个表达式来计算 URI,您可以从当前交换中获取值。例如,您可以使用从数据交换中计算的名称来轮询文件。Camel 2.16 中引入了此行为。这个更改会破坏 XML DSL,并可让您轻松迁移。Java DSL 保持向后兼容。

使用消息转换器和处理器来增强内容

Camel 提供 流畅的构建器,利用一种安全型 IDE 友好的方式创建路由和调解规则,从而提供智能完成并安全重构。当您测试分布式系统时,必须存根特定外部系统,以便可以在特定系统可用或编写特定系统之前测试系统的其他部分。执行此操作的一种方法是通过生成具有大部分静态正文的动态消息来生成对请求的响应。https://camel.apache.org/templating.html使用模板的另一种方法是使用来自一个目的地的消息,将其转换为 VelocityXQuery 等内容,然后将其发送到另一目的地。以下示例显示了一个 InOnly (单向)消息:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

假设您想使用 InOut (请求)消息传递来处理 ActiveMQ 上的 My.Queue 队列上的请求。您需要一个模板生成的响应,它指向 JMSReplyTo 目的地。以下示例演示了如何进行此操作:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

以下简单示例演示了如何使用 DSL 转换消息正文:

from("direct:start").setBody(body().append(" World!")).to("mock:result");

以下示例使用显式 Java 代码来添加处理器:

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

下一个示例使用 bean 集成,使任何 bean 的使用作为转换器:

from("activemq:My.Queue").
  beanRef("myBeanName", "myMethodName").
  to("activemq:Another.Queue");

以下示例显示了一个 Spring XML 的实现:

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>/>

使用 enrich()方法丰富内容

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...

内容增强器(丰富)从 资源端点 检索额外的数据,以增强传入的消息(在机构 交换中包含)。聚合策略将原始交换与资源交换相结合。AggregationStrategy.aggregate(Exchange、Exchange) 方法的第一个参数与原始交换对应,第二个参数则对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。以下是实施您自己的聚合策略类的示例模板:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

使用此模板时,原始交换可以具有任何交换模式。由增强器创建的资源交换始终是一个 InOut 交换。

Spring XML 丰富示例

前面的示例也可以在 Spring XML 中实施:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

增强内容时的默认聚合策略

聚合策略是可选的。如果没有提供它,Apache Camel 将默认使用从资源获取的正文。例如:

from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");

在前面的路由中,发送到 direct:result 端点的消息包含来自 direct:resource 的消息,因为本例不使用任何自定义聚合。

在 XML DSL 中,只省略 strategyRef 属性,如下所示:

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

enrich()方法支持的选项

enrich DSL 命令支持以下选项:

名称

默认值

描述

expression

从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于配置外部服务的 URI,以便从中丰富。您可以使用 Simple 表达式语言、Constant 表达式语言或任何可动态地从当前交换中的值动态计算 URI 的语言。

uri

 

这些选项已被删除。改为指定 expression 选项。在 Camel 2.15 及更早版本中,需要规格 uri 选项或 ref 选项。每个选项都指定要从中丰富的外部服务的端点 URI。

Ref

 

代表外部服务的端点,以丰富.您必须使用 uriref

strategyRef

 

指的是 AggregationStrategy,用于将外部服务的回复合并到单一传出消息中。默认情况下,Camel 使用来自外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

使用 OVAs 作为 AggregationStrategy 时,指定这个选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认行为是,如果没有要增强数据,则聚合方法不使用。如果这个选项为 true,则当没有数据丰富且您用作 AggregationStrategy 时,会使用 null 值作为 旧的Exchange。如需更多信息,请参阅 Aggregate 模式。

aggregateOnException

false

默认行为是,如果试图从资源检索数据丰富的数据时,不使用 聚合方法。如果将此选项设置为 true,最终用户可以控制聚合方法中是否有异常时要做什么。例如,可以阻止异常或设置自定义消息正文

shareUntOfWork

false

从 Camel 2.16 开始,默认行为是,丰富的操作不共享父交换与资源交换之间的工作单元。这意味着资源交换都有自己的工作单元。如需更多信息,请参阅 Splitter 模式的文档。

cacheSize

1000

从 Camel 2.16 开始,指定这个选项来为 ProducerCache 配置缓存大小,这将缓存制作者以便在增强操作中重复使用。要关闭这个缓存,将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

从 Camel 2.16 开始,这个选项指示是否忽略无法解析的端点 URI。默认行为是 Camel 引发标识无效端点 URI 的异常。

使用 enrich()方法指定聚合策略

enrich() 方法从资源端点检索额外的数据来增强传入的消息,该消息包含在原始交换中。您可以使用聚合策略来合并原始交换和资源交换。AggregationStrategy.aggregate(Exchange、Exchange) 方法的第一个参数与原始交换对应。第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。例如:

AggregationStrategy aggregationStrategy = ...

   from("direct:start")
   .enrich("direct:resource", aggregationStrategy)
   .to("direct:result");

   from("direct:resource")
...

以下代码是实施聚合策略的模板。在使用此模板的实现中,原始交换可以是任何消息交换模式。由 enricher 创建的资源交换始终是一个 InOut 消息交换模式。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

以下示例显示了使用 Spring XML DSL 来实现聚合策略:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

使用带有 enrich()的动态 URI

从 Camel 2.16 开始,enrich()pollEnrich() 方法支持使用基于当前交换信息计算的动态 URI。例如,要从 HTTP 端点中丰富,带有 orderId 键的标头被用作 HTTP URL 的内容路径的一部分,您可以执行以下操作:

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

以下是 XML DSL 中相同的示例:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

使用 pollEnrich()方法丰富内容

pollEnrich 命令会将资源端点视为 消费者。它将 轮询 端点,而不是向资源端点发送交换。默认情况下,如果资源端点没有可用的交换,则轮询会立即返回。例如,以下路由读取从传入 JMS 消息标头中提取其名称的文件:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

您可以限制等待文件就绪的时间。以下示例显示了最大等待 20 秒:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

您还可以为 pollEnrich() 指定聚合策略,例如:

   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)

pollEnrich() 方法支持使用 consumer.bridgeErrorHandler=true 配置的用户。这样,可以从轮询传播到路由错误处理程序的任何异常(例如,重试轮询)。

注意

consumer.bridgeErrorHandler=true 的支持是 Camel 2.18 中的新功能。Camel 2.17 不支持此行为。

传递给聚合策略的 aggregate() 方法的资源交换可能是 null (如果轮询超时)在收到交换前超时。

pollEnrich()使用的轮询方法

pollEnrich() 方法通过调用以下轮询方法之一来轮询消费者端点:

  • receiveNoWait ()(这是默认值。)
  • receive()
  • 接收(长超时)

pollEnrich() 命令的 timeout 参数(以毫秒为单位)决定要调用的方法,如下所示:

  • 如果超时为 0 或未指定,则 pollEnrich() 调用 receiveNoWait
  • 当超时为负数时,pollEnrich() 调用会接收。
  • 否则,pollEnrich() 调用 receive(timeout)

如果没有数据,则聚合策略中的 newExchange 为 null。

使用 pollEnrich()方法的示例

以下示例显示,通过从 inbox/data.txt 文件中载入内容来增强消息:

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

以下是 XML DSL 中相同的示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

如果指定的文件不存在,则消息为空。您可以指定要等待的超时时间(永久),直到文件存在或等待到特定的时长。在以下示例中,命令会等待不超过 5 秒:

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="5000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

使用带有 pollEnrich()的动态 URI

从 Camel 2.16 开始,enrich()pollEnrich() 方法支持使用基于当前交换信息计算的动态 URI。例如,要从使用标头来指示 SEDA 队列名称的端点中轮询 enrich,您可以执行以下操作:

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

以下是 XML DSL 中相同的示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

pollEnrich()方法支持的选项

pollEnrich DSL 命令支持以下选项:

名称

默认值

描述

expression

从 Camel 2.16 开始,需要这个选项。指定一个表达式,用于配置外部服务的 URI,以便从中丰富。您可以使用 Simple 表达式语言、Constant 表达式语言或任何可动态地从当前交换中的值动态计算 URI 的语言。

uri

 

这些选项已被删除。改为指定 expression 选项。在 Camel 2.15 及更早版本中,需要规格 uri 选项或 ref 选项。每个选项都指定要从中丰富的外部服务的端点 URI。

Ref

 

代表外部服务的端点,以丰富.您必须使用 uriref

strategyRef

 

指的是 AggregationStrategy,用于将外部服务的回复合并到单一传出消息中。默认情况下,Camel 使用来自外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

使用 OVAs 作为 AggregationStrategy 时,指定这个选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认行为是,如果没有要增强数据,则聚合方法不使用。如果这个选项为 true,则当没有数据丰富且您用作 AggregationStrategy 时,会使用 null 值作为 旧的Exchange。如需更多信息,请参阅 Aggregate 模式。

timeout

-1

从外部服务轮询时要等待响应的最长时间(以毫秒为单位)。默认行为是 pollEnrich() 方法调用 receive() 方法。因为 receive() 可能会阻断,直到有可用的消息,所以建议始终指定超时。

aggregateOnException

false

默认行为是,如果试图从资源检索数据丰富的数据时,不使用 聚合方法。如果将此选项设置为 true,最终用户可以控制聚合方法中是否有异常时要做什么。例如,可以阻止异常或设置自定义消息正文

cacheSize

1000

指定这个选项来为 ConsumerCache 配置缓存大小,该缓存消费者在 pollEnrich() 操作中重复使用。要关闭这个缓存,将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

指明是否忽略了无法解析的端点 URI。默认行为是 Camel 引发标识无效端点 URI 的异常。