第 10 章 消息转换

摘要

消息转换模式描述了如何根据各种用途修改消息内容。

10.1. 内容增强

概述

内容丰富的 模式描述了消息目的地在原始消息中比存在更多的数据的情况。在这种情况下,您将使用消息转换器、路由逻辑中的任意处理器、路由逻辑中的任意处理器或内容丰富的方法从外部资源拉取额外数据。

图 10.1. 内容增强模式

内容丰富的模式

丰富内容的替代方案

Apache Camel 支持多种丰富内容的方法:

  • 在路由逻辑中使用任意处理器的消息转换器
  • enrich () 方法通过将当前交换的副本发送到 制作者 端点,然后使用生成的回复中的数据,从资源获取其他数据。增强器创建的交换始终是 InOut 交换。
  • pollEnrich () 方法通过轮询 消费者 端点数据来获取额外的数据。有效地从主路由和 pollEnrich () 操作中的消费者端点耦合。也就是说,路由中初始使用者上的传入消息会触发轮询使用者的 pollEnrich () 方法。
注意

enrich ()pollEnrich () 方法支持动态端点 URI。您可以通过指定表达式来计算 URI,以便从当前交换获得值。例如,您可以使用数据交换计算的名称轮询文件。这个行为是在 Camel 2.16 中引入的。这个更改会破坏 XML DSL,并可让您轻松迁移。Java DSL 保持向后兼容。

使用消息转换器和处理器来丰富内容

Camel 提供 流畅的构建器,用于通过提供智能完成且安全重构的 type-safe IDE 友好的方式创建路由和调解规则。当您测试分布式系统时,必须非常常见的要求才能根特定外部系统,以便您可以在特定系统可用或编写特定系统前测试系统的其他部分。其中一种实现方式是通过生成 具有最多 静态正文的动态消息来生成对请求的响应。使用模板的另一种方法是使用一个目的地的消息,使用诸如 VelocityXQuery 之类的内容转换,然后将其发送到另一目的地。以下示例显示 这个信息

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

假设您要使用 InOut (请求)消息传递来处理 ActiveMQ 上 My.Queue 队列上的请求。您希望模板生成的响应进入 JMSReplyTo 目标。以下示例演示了如何进行此操作:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

以下简单示例演示了如何使用 DSL 来转换消息正文:

from("direct:start").setBody(body().append(" World!")).to("mock:result");

以下示例使用显式 Java 代码来添加处理器:

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

下一个示例使用 bean 集成来启用使用任意 bean 作为转换器:

from("activemq:My.Queue").
  beanRef("myBeanName", "myMethodName").
  to("activemq:Another.Queue");

以下示例显示了 Spring XML 实现:

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>/>

使用 enrich ()方法增强内容

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...

内容丰富(增强 )从 资源端点 检索额外的数据,以便增强传入消息(包含在组织 交换中)。聚合策略结合了原始交换 和资源交换AggregationStrategy.aggregate (Exchange, Exchange) 方法的第一个参数对应于原始交换,第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。以下是实施您自己的聚合策略类的示例模板:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

使用此模板时,原始交换可以有任何交换模式。增强程序创建的资源交换始终是一个 InOut 交换。

Spring XML enrich 示例

前面的示例也可以在 Spring XML 中实施:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

丰富内容时的默认聚合策略

聚合策略是可选的。如果没有提供,Apache Camel 将使用默认情况下从资源获得的正文。例如:

from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");

在前面的路由中,发送到 direct:result 端点的消息包含 direct:resource 的输出,因为本示例不使用任何自定义聚合。

在 XML DSL 中,只需省略 strategyRef 属性,如下所示:

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

enrich ()方法支持的选项

enrich DSL 命令支持以下选项:

Name

默认值

描述

expression

None

从 Camel 2.16 开始,需要此选项。指定一个表达式,用于从将外部服务的 URI 配置为丰富。您可以使用 Simple 表达式语言、Constant 表达式语言或任何其他语言,从当前交换中的值动态计算 URI。

uri

 

这些选项已被删除。指定 表达式 选项代替。在 Camel 2.15 及更早的版本中,需要 uri 选项或 ref 选项规格。每个选项都指定外部服务的端点 URI,以丰富。

ref

 

是指外部服务的端点,以便从中丰富。您必须使用 uriref

strategyRef

 

指的是用于将外部服务的回复合并到单个传出消息中。https://www.javadoc.io/doc/org.apache.camel/camel-core/2.23.2/org/apache/camel/processor/aggregate/AggregationStrategy.html默认情况下,Camel 使用外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

当使用 POJO 作为 AggregationStrategy 时,指定这个选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认的行为是,如果没有数据丰富,则不会使用聚合方法。如果这个选项为 true,则当没有丰富数据时,使用 null 值作为 oldExchange,且您使用 POJOs 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式。

aggregateOnException

false

默认行为是,如果在尝试从资源检索数据时抛出异常,则不会使用聚合方法。将此选项设置为 true 可允许最终用户控制聚合方法中是否有异常。例如,可以抑制异常或设置自定义消息正文

shareUntOfWork

false

从 Camel 2.16 开始,默认行为是增强操作在父交换和资源交换之间不共享工作单元。这意味着资源交换具有自己的独立工作单元。如需更多信息,请参阅 Splitter 模式的文档。

cacheSize

1000

从 Camel 2.16 开始,指定这个选项来配置 ProducerCache 的缓存大小,这将缓存制作者以便在 enrich 操作中重复使用。要关闭此缓存,将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

从 Camel 2.16 开始,此选项指示是否忽略无法解析的端点 URI。默认行为是 Camel 抛出标识无效端点 URI 的异常。

使用 enrich ()方法指定聚合策略时

enrich () 方法从资源端点检索额外的数据,以丰富传入消息,该消息包含在原始交换中。您可以使用聚合策略来组合原始交换和资源交换。AggregationStrategy.aggregate (Exchange, Exchange) 方法的第一个参数对应于原始交换。第二个参数对应于资源交换。资源端点的结果存储在资源交换的 Out 消息中。例如:

AggregationStrategy aggregationStrategy = ...

   from("direct:start")
   .enrich("direct:resource", aggregationStrategy)
   .to("direct:result");

   from("direct:resource")
...

以下代码是实施聚合策略的模板。在使用此模板的实施中,原始交换可以是任何消息交换模式。增强器创建的资源交换始终是 InOut 消息交换模式。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

以下示例显示了使用 Spring XML DSL 来实施聚合策略:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

使用带 enrich ()的动态 URI.

从 Camel 2.16 开始,enrich ()pollEnrich () 方法支持根据当前交换的信息计算使用动态 URI。例如,若要从 HTTP 端点提供 orderId 键的标头作为 HTTP URL 的内容路径的一部分,您可以执行如下操作:

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

以下是 XML DSL 中的相同示例:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

使用 pollEnrich ()方法增强内容

pollEnrich 命令将资源端点视为 使用者。它将交换发送到资源端点,而不是 轮询 端点。默认情况下,如果资源端点没有交换,则轮询会立即返回。例如,以下路由会读取从传入 JMS 消息的标头中提取名称的文件:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

您可以限制等待文件就绪的时间。以下示例显示了最多等待 20 秒:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

您还可以为 pollEnrich () 指定聚合策略,例如:

   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)

pollEnrich () 方法支持配置了 consumer.bridgeErrorHandler=true 的使用者。这可让轮询中的任何异常传播到路由错误处理程序,例如,可以重试轮询。

注意

consumer.bridgeErrorHandler=true 的支持在 Camel 2.18 中是新的。Camel 2.17 不支持此行为。

如果在接收交换前轮询超时,则传递到聚合策略的 aggregate () 方法的资源交换可能为空。

pollEnrich ()使用的轮询方法。

pollEnrich () 方法通过调用以下轮询方法之一来轮询消费者端点:

  • receiveNoWait ()(这是默认值。)
  • receive()
  • receive(long timeout)

pollEnrich () 命令的 timeout 参数(以毫秒为单位指定)决定要调用的方法,如下所示:

  • 当超时为 0 或未指定时,pollEnrich () 调用 receiveNoWait
  • 当超时为负时,pollEnrich () 调用 会收到
  • 否则,pollEnrich () 调用 receive (timeout)

如果没有数据,则聚合策略中的 newExchange 为 null。

使用 pollEnrich ()方法的示例

以下示例显示了通过从 inbox/data.txt 文件中加载内容来丰富消息:

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

以下是 XML DSL 中的相同示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

如果指定的文件不存在,则消息为空。您可以指定一个超时等待(可能永久),直到文件存在或最多等待特定时长。在以下示例中,命令会等待不超过 5 秒:

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="5000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

使用带有 pollEnrich ()的动态 URI

从 Camel 2.16 开始,enrich ()pollEnrich () 方法支持根据当前交换的信息计算使用动态 URI。例如,要从使用标头来代表 SEDA 队列名称的端点中轮询增强,您可以这样做:

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

以下是 XML DSL 中的相同示例:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

pollEnrich ()方法支持的选项

pollEnrich DSL 命令支持以下选项:

Name

默认值

描述

expression

None

从 Camel 2.16 开始,需要此选项。指定一个表达式,用于从将外部服务的 URI 配置为丰富。您可以使用 Simple 表达式语言、Constant 表达式语言或任何其他语言,从当前交换中的值动态计算 URI。

uri

 

这些选项已被删除。指定 表达式 选项代替。在 Camel 2.15 及更早的版本中,需要 uri 选项或 ref 选项规格。每个选项都指定外部服务的端点 URI,以丰富。

ref

 

是指外部服务的端点,以便从中丰富。您必须使用 uriref

strategyRef

 

指的是用于将外部服务的回复合并到单个传出消息中。https://www.javadoc.io/doc/org.apache.camel/camel-core/2.23.2/org/apache/camel/processor/aggregate/AggregationStrategy.html默认情况下,Camel 使用外部服务的回复作为传出消息。您可以使用 POJO 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式的文档。

strategyMethodName

 

当使用 POJO 作为 AggregationStrategy 时,指定这个选项来显式声明聚合方法的名称。详情请查看 Aggregate 模式。

strategyMethodAllowNull

false

默认的行为是,如果没有数据丰富,则不会使用聚合方法。如果这个选项为 true,则当没有丰富数据时,使用 null 值作为 oldExchange,且您使用 POJOs 作为 AggregationStrategy。如需更多信息,请参阅 Aggregate 模式。

timeout

-1

从外部服务轮询时要等待响应的最长时间,以毫秒为单位。默认行为是 pollEnrich () 方法调用 receive () 方法。因为 receive () 可以阻止直到有可用消息,所以建议总是指定超时。

aggregateOnException

false

默认行为是,如果在尝试从资源检索数据时抛出异常,则不会使用聚合方法。将此选项设置为 true 可允许最终用户控制聚合方法中是否有异常。例如,可以抑制异常或设置自定义消息正文

cacheSize

1000

指定这个选项,为 ConsumerCache 配置缓存大小,缓存使用者以便在 pollEnrich () 操作中重复使用。要关闭此缓存,将 cacheSize 选项设置为 -1

ignoreInvalidEndpoint

false

指明是否忽略无法解析的端点 URI。默认行为是 Camel 抛出标识无效端点 URI 的异常。