Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 9. Message Transformation

Abstract

The message transformation patterns describe how to modify the contents of messages for various purposes.

9.1. Content Enricher

Overview

The content enricher pattern describes a scenario where the message destination requires more data than is present in the original message. In this case, you would use a message translator, an arbitrary processor in the routing logic, or a content enricher method to pull in the extra data from an external resource.

Figure 9.1. Content Enricher Pattern

Content enricher pattern

Alternatives for enriching content

Apache Camel supports several ways to enrich content:

  • Message translator with arbitrary processor in the routing logic
  • The enrich() method obtains additional data from the resource by sending a copy of the current exchange to a producer endpoint and then using the data in the resulting reply. The exchange created by the enricher is always an InOut exchange.
  • The pollEnrich() method obtains additional data by polling a consumer endpoint for data. Effectively, the consumer endpoint from the main route and the consumer endpoint in pollEnrich() operation are coupled. That is, an incoming message on the initial consumer in the route triggers the pollEnrich() method on the consumer to be polled.
Note

The enrich() and pollEnrich() methods support dynamic endpoint URIs. You can compute URIs by specifying an expression that enables you to obtain values from the current exchange. For example, you can poll a file with a name that is computed from the data exchange. This behavior was introduced in Camel 2.16. This change breaks the XML DSL and enables you to migrate easily. The Java DSL stays backwards compatible.

Using message translators and processors to enrich content

Camel provides fluent builders for creating routing and mediation rules using a type-safe IDE-friendly way that provides smart completion and is refactoring safe. When you are testing distributed systems it is a very common requirement to have to stub out certain external systems so that you can test other parts of the system until a specific system is available or written. One way to do this is to use some kind of template system to generate responses to requests by generating a dynamic message that has a mostly-static body. Another way to use templates is to consume a message from one destination, transform it with something like olink:IDU-Components/IDU-Velocity or olink:IDU-Components/IDU-XQueryEndpoint, and then send it to another destination. The following example shows this for an InOnly (one way) message:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

Suppose you want to use InOut (request-reply) messaging to process requests on the My.Queue queue on ActiveMQ. You want a template-generated response that goes to a JMSReplyTo destination. The following example shows how to do this:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

The following simple example shows how to use DSL to transform the message body:

from("direct:start").setBody(body().append(" World!")).to("mock:result");

The following example uses explicit Java code to add a processor:

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

The next example uses bean integration to enable the use of any bean to act as the transformer:

from("activemq:My.Queue").
  beanRef("myBeanName", "myMethodName").
  to("activemq:Another.Queue");

The following example shows a Spring XML implementation:

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>/>

Using the enrich() method to enrich content

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...

The content enricher (enrich) retrieves additional data from a resource endpoint in order to enrich an incoming message (contained in the orginal exchange). An aggregation strategy combines the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange) method corresponds to the the original exchange, and the second parameter corresponds to the resource exchange. The results from the resource endpoint are stored in the resource exchange’s Out message. Here is a sample template for implementing your own aggregation strategy class:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

Using this template, the original exchange can have any exchange pattern. The resource exchange created by the enricher is always an InOut exchange.

Spring XML enrich example

The preceding example can also be implemented in Spring XML:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

Default aggregation strategy when enriching content

The aggregation strategy is optional. If you do not provide it, Apache Camel will use the body obtained from the resource by default. For example:

from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");

In the preceding route, the message sent to the direct:result endpoint contains the output from the direct:resource, because this example does not use any custom aggregation.

In XML DSL, just omit the strategyRef attribute, as follows:

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

Options supported by the enrich() method

The enrich DSL command supports the following options:

Name

Default Value

Description

expression

None

Starting with Camel 2.16, this option is required. Specify an expression for configuring the URI of the external service to enrich from. You can use the olink:../el/simple.xml/simple expression language, the olink:../el/constant.xml/constant expression language, or any other language that can dynamically compute the URI from values in the current exchange.

uri

 

These options have been removed. Specify the expression option instead. In Camel 2.15 and earlier, specification of the uri option or the ref option was required. Each option specified the endpoint URI for the external service to enrich from.

ref

 

Refers to the endpoint for the external service to enrich from. You must use either uri or ref.

strategyRef

 

Refers to an AggregationStrategy to be used to merge the reply from the external service into a single outgoing message. By default, Camel uses the reply from the external service as the outgoing message. You can use a POJO as the AggregationStrategy. For additional information, see the documentation for the Aggregate pattern.

strategyMethodName

 

When using POJOs as the AggregationStrategy, specify this option to explicitly declare the name of the aggregation method. For details, see the Aggregate pattern.

strategyMethodAllowNull

false

The default behavior is that the aggregate method is not used if there is no data to enrich. If this option is true then null values are used as the oldExchange when there is no data to enrich and you are using POJOs as the AggregationStrategy. For more information, see the Aggregate pattern.

aggregateOnException

false

The default behavior is that the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example, it is possible to suppress the exception or set a custom message body

shareUntOfWork

false

Starting with Camel 2.16, the default behavior is that the enrich operation does not share the unit of work between the parent exchange and the resource exchange. This means that the resource exchange has its own individual unit of work. For more information, see the documentation for the Splitter pattern.

cacheSize

1000

Starting with Camel 2.16, specify this option to configure the cache size for the ProducerCache, which caches producers for reuse in the enrich operation. To turn off this cache, set the cacheSize option to -1.

ignoreInvalidEndpoint

false

Starting with Camel 2.16, this option indicates whether or not to ignore an endpoint URI that cannot be resolved. The default behavior is that Camel throws an exception that identifies the invalid endpoint URI.

Specifying an aggregation strategy when using the enrich() method

The enrich() method retrieves additional data from a resource endpoint to enrich an incoming message, which is contained in the original exchange. You can use an aggregation strategy to combine the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange) method corresponds to the original exchange. The second parameter corresponds to the resource exchange. The results from the resource endpoint are stored in the resource exchange’s Out message. For example:

AggregationStrategy aggregationStrategy = ...

   from("direct:start")
   .enrich("direct:resource", aggregationStrategy)
   .to("direct:result");

   from("direct:resource")
...

The following code is a template for implementing an aggregation strategy. In an implementation that uses this template, the original exchange can be any message exchange pattern. The resource exchange created by the enricher is always an InOut message exchange pattern.

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

The following example shows the use of the Spring XML DSL to implement an aggregation strategy:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

Using dynamic URIs with enrich()

Starting with Camel 2.16, the enrich() and pollEnrich() methods support the use of dynamic URIs that are computed based on information from the current exchange. For example, to enrich from an HTTP endpoint where the header with the orderId key is used as part of the content path of the HTTP URL, you can do something like this:

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

Following is the same example in XML DSL:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

Using the pollEnrich() method to enrich content

The pollEnrich command treats the resource endpoint as a consumer. Instead of sending an exchange to the resource endpoint, it polls the endpoint. By default, the poll returns immediately, if there is no exchange available from the resource endpoint. For example, the following route reads a file whose name is extracted from the header of an incoming JMS message:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

You can limit the time to wait for the file to be ready. The following example shows a maximum wait of 20 seconds:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

You can also specify an aggregation strategy for pollEnrich(), for example:

   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)

The pollEnrich() method supports consumers that are configured with consumer.bridgeErrorHandler=true. This lets any exceptions from the poll propagate to the route error handler, which could, for example, retry the poll.

Note

Support for consumer.bridgeErrorHandler=true is new in Camel 2.18. This behavior is not supported in Camel 2.17.

The resource exchange passed to the aggregation strategy’s aggregate() method might be null if the poll times out before an exchange is received.

Polling methods used by pollEnrich()

The pollEnrich() method polls the consumer endpoint by calling one of the following polling methods:

  • receiveNoWait()(This is the default.)
  • receive()
  • receive(long timeout)

The pollEnrich() command’s timeout argument (specified in milliseconds) determines which method to call, as follows:

  • When the timeout is 0 or not specified, pollEnrich() calls receiveNoWait.
  • When the timeout is negative, pollEnrich() calls receive.
  • Otherwise, pollEnrich() calls receive(timeout).

If there is no data then the newExchange in the aggregation strategy is null.

Examples of using the pollEnrich() method

The following example shows enrichment of the message by loading the content from the inbox/data.txt file:

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

Following is the same example in XML DSL:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

If the specified file does not exist then the message is empty. You can specify a timeout to wait (potentially forever) until a file exists or to wait up to a particular length of time. In the following example, the command waits no more than 5 seconds:

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="5000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

Using dynamic URIs with pollEnrich()

Starting with Camel 2.16, the enrich() and pollEnrich() methods support the use of dynamic URIs that are computed based on information from the current exchange. For example, to poll enrich from an endpoint that uses a header to indicate a SEDA queue name, you can do something like this:

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

Following is the same example in XML DSL:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

Options supported by the pollEnrich() method

The pollEnrich DSL command supports the following options:

Name

Default Value

Description

expression

None

Starting with Camel 2.16, this option is required. Specify an expression for configuring the URI of the external service to enrich from. You can use the olink:../el/simple.xml/simple expression language, the olink:../el/constant.xml/constant expression language, or any other language that can dynamically compute the URI from values in the current exchange.

uri

 

These options have been removed. Specify the expression option instead. In Camel 2.15 and earlier, specification of the uri option or the ref option was required. Each option specified the endpoint URI for the external service to enrich from.

ref

 

Refers to the endpoint for the external service to enrich from. You must use either uri or ref.

strategyRef

 

Refers to an AggregationStrategy to be used to merge the reply from the external service into a single outgoing message. By default, Camel uses the reply from the external service as the outgoing message. You can use a POJO as the AggregationStrategy. For additional information, see the documentation for the Aggregate pattern.

strategyMethodName

 

When using POJOs as the AggregationStrategy, specify this option to explicitly declare the name of the aggregation method. For details, see the Aggregate pattern.

strategyMethodAllowNull

false

The default behavior is that the aggregate method is not used if there is no data to enrich. If this option is true then null values are used as the oldExchange when there is no data to enrich and you are using POJOs as the AggregationStrategy. For more information, see the Aggregate pattern.

timeout

-1

The maximum length of time, in milliseconds, to wait for a response when polling from the external service. The default behavior is that the pollEnrich() method calls the receive() method. Because receive() can block until there is a message available, the recommendation is to always specify a timeout.

aggregateOnException

false

The default behavior is that the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example, it is possible to suppress the exception or set a custom message body

cacheSize

1000

Specify this option to configure the cache size for the ConsumerCache, which caches consumers for reuse in the pollEnrich() operation. To turn off this cache, set the cacheSize option to -1.

ignoreInvalidEndpoint

false

Indicates whether or not to ignore an endpoint URI that cannot be resolved. The default behavior is that Camel throws an exception that identifies the invalid endpoint URI.

9.2. Content Filter

Overview

The content filter pattern describes a scenario where you need to filter out extraneous content from a message before delivering it to its intended recipient. For example, you might employ a content filter to strip out confidential information from a message.

Figure 9.2. Content Filter Pattern

Content filter pattern

A common way to filter messages is to use an expression in the DSL, written in one of the supported scripting languages (for example, XSLT, XQuery or JoSQL).

Implementing a content filter

A content filter is essentially an application of a message processing technique for a particular purpose. To implement a content filter, you can employ any of the following message processing techniques:

XML configuration example

The following example shows how to configure the same route in XML:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="activemq:My.Queue"/>
    <to uri="xslt:classpath:com/acme/content_filter.xsl"/>
    <to uri="activemq:Another.Queue"/>
  </route>
</camelContext>

Using an XPath filter

You can also use XPath to filter out part of the message you are interested in:

<route>
  <from uri="activemq:Input"/>
  <setBody><xpath resultType="org.w3c.dom.Document">//foo:bar</xpath></setBody>
  <to uri="activemq:Output"/>
</route>

9.3. Normalizer

Overview

The normalizer pattern is used to process messages that are semantically equivalent, but arrive in different formats. The normalizer transforms the incoming messages into a common format.

In Apache Camel, you can implement the normalizer pattern by combining a Section 8.1, “Content-Based Router”, which detects the incoming message’s format, with a collection of different Section 5.6, “Message Translator”, which transform the different incoming formats into a common format.

Figure 9.3. Normalizer Pattern

Normalizer pattern

Java DSL example

This example shows a Message Normalizer that converts two types of XML messages into a common format. Messages in this common format are then filtered.

Using the Fluent Builders

// we need to normalize two types of incoming messages
from("direct:start")
    .choice()
        .when().xpath("/employee").to("bean:normalizer?method=employeeToPerson")
        .when().xpath("/customer").to("bean:normalizer?method=customerToPerson")
    .end()
    .to("mock:result");

In this case we’re using a Java bean as the normalizer. The class looks like this

// Java
public class MyNormalizer {
    public void employeeToPerson(Exchange exchange, @XPath("/employee/name/text()") String name) {
        exchange.getOut().setBody(createPerson(name));
    }

    public void customerToPerson(Exchange exchange, @XPath("/customer/@name") String name) {
        exchange.getOut().setBody(createPerson(name));
    }

    private String createPerson(String name) {
        return "<person name=\"" + name + "\"/>";
    }
}

XML configuration example

The same example in the XML DSL

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <choice>
      <when>
        <xpath>/employee</xpath>
        <to uri="bean:normalizer?method=employeeToPerson"/>
      </when>
      <when>
        <xpath>/customer</xpath>
        <to uri="bean:normalizer?method=customerToPerson"/>
      </when>
    </choice>
    <to uri="mock:result"/>
  </route>
</camelContext>

<bean id="normalizer" class="org.apache.camel.processor.MyNormalizer"/>

9.4. Claim Check

Claim Check

The claim check pattern, shown in Figure 9.4, “Claim Check Pattern”, allows you to replace message content with a claim check (a unique key), which can be used to retrieve the message content at a later time. The message content is stored temporarily in a persistent store like a database or file system. This pattern is very useful when message content is very large (thus it would be expensive to send around) and not all components require all information.

It can also be useful in situations where you cannot trust the information with an outside party; in this case, you can use the Claim Check to hide the sensitive portions of data.

Figure 9.4. Claim Check Pattern

store in library

Java DSL example

The following example shows how to replace a message body with a claim check and restore the body at a later step.

from("direct:start").to("bean:checkLuggage", "mock:testCheckpoint", "bean:dataEnricher", "mock:result");

The next step in the pipeline is the mock:testCheckpoint endpoint, which checks that the message body has been removed, the claim check added, and so on.

XML DSL example

The preceding example can also be written in XML, as follows:

 <route>
     <from uri="direct:start"/>
     <pipeline>
         <to uri="bean:checkLuggage"/>
         <to uri="mock:testCheckpoint"/>
         <to uri="bean:dataEnricher"/>
         <to uri="mock:result"/>
     </pipeline>
 </route>

checkLuggage bean

The message is first sent to the checkLuggage bean which is implemented as follows:

public static final class CheckLuggageBean {
    public void checkLuggage(Exchange exchange, @Body String body, @XPath("/order/@custId") String custId) {
        // store the message body into the data store, using the custId as the claim check
        dataStore.put(custId, body);
        // add the claim check as a header
        exchange.getIn().setHeader("claimCheck", custId);
        // remove the body from the message
        exchange.getIn().setBody(null);
    }
}

This bean stores the message body into the data store, using the custId as the claim check. In this example, we are using a HashMap to store the message body; in a real application you would use a database or the file system. The claim check is added as a message header for later use and, finally, we remove the body from the message and pass it down the pipeline.

testCheckpoint endpoint

The example route is just a Section 5.4, “Pipes and Filters”. In a real application, you would substitute some other steps for the mock:testCheckpoint endpoint.

dataEnricher bean

To add the message body back into the message, we use the dataEnricher bean, which is implemented as follows:

public static final class DataEnricherBean {
    public void addDataBackIn(Exchange exchange, @Header("claimCheck") String claimCheck) {
        // query the data store using the claim check as the key and add the data
        // back into the message body
        exchange.getIn().setBody(dataStore.get(claimCheck));
        // remove the message data from the data store
        dataStore.remove(claimCheck);
        // remove the claim check header
        exchange.getIn().removeHeader("claimCheck");
    }
}

This bean queries the data store, using the claim check as the key, and then adds the recovered data back into the message body. The bean then deletes the message data from the data store and removes the claimCheck header from the message.

9.5. Sort

Sort

The sort pattern is used to sort the contents of a message body, assuming that the message body contains a list of items that can be sorted.

By default, the contents of the message are sorted using a default comparator that handles numeric values or strings. You can provide your own comparator and you can specify an expression that returns the list to be sorted (the expression must be convertible to java.util.List).

Java DSL example

The following example generates the list of items to sort by tokenizing on the line break character:

from("file://inbox").sort(body().tokenize("\n")).to("bean:MyServiceBean.processLine");

You can pass in your own comparator as the second argument to sort():

from("file://inbox").sort(body().tokenize("\n"), new MyReverseComparator()).to("bean:MyServiceBean.processLine");

XML configuration example

You can configure the same routes in Spring XML.

The following example generates the list of items to sort by tokenizing on the line break character:

<route>
  <from uri="file://inbox"/>
  <sort>
    <simple>body</simple>
  </sort>
  <beanRef ref="myServiceBean" method="processLine"/>
</route>

And to use a custom comparator, you can reference it as a Spring bean:

<route>
  <from uri="file://inbox"/>
  <sort comparatorRef="myReverseComparator">
    <simple>body</simple>
  </sort>
  <beanRef ref="MyServiceBean" method="processLine"/>
</route>

<bean id="myReverseComparator" class="com.mycompany.MyReverseComparator"/>

Besides <simple>, you can supply an expression using any language you like, so long as it returns a list.

Options

The sort DSL command supports the following options:

Name

Default Value

Description

comparatorRef

 

Refers to a custom java.util.Comparator to use for sorting the message body. Camel will by default use a comparator which does a A..Z sorting.

9.6. Validate

Overview

The validate pattern provides a convenient syntax to check whether the content of a message is valid. The validate DSL command takes a predicate expression as its sole argument: if the predicate evaluates to true, the route continues processing normally; if the predicate evaluates to false, a PredicateValidationException is thrown.

Java DSL example

The following route validates the body of the current message using a regular expression:

from("jms:queue:incoming")
  .validate(body(String.class).regex("^\\w{10}\\,\\d{2}\\,\\w{24}$"))
  .to("bean:MyServiceBean.processLine");

You can also validate a message header — for example:

from("jms:queue:incoming")
  .validate(header("bar").isGreaterThan(100))
  .to("bean:MyServiceBean.processLine");

And you can use validate with the simple expression language:

from("jms:queue:incoming")
  .validate(simple("${in.header.bar} == 100"))
  .to("bean:MyServiceBean.processLine");

XML DSL example

To use validate in the XML DSL, the recommended approach is to use the simple expression language:

<route>
  <from uri="jms:queue:incoming"/>
  <validate>
    <simple>${body} regex ^\\w{10}\\,\\d{2}\\,\\w{24}$</simple>
  </validate>
  <beanRef ref="myServiceBean" method="processLine"/>
</route>

<bean id="myServiceBean" class="com.mycompany.MyServiceBean"/>

You can also validate a message header — for example:

<route>
  <from uri="jms:queue:incoming"/>
  <validate>
    <simple>${in.header.bar} == 100</simple>
  </validate>
  <beanRef ref="myServiceBean" method="processLine"/>
</route>

<bean id="myServiceBean" class="com.mycompany.MyServiceBean"/>