Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 9. Message Transformation

Abstract

The message transformation patterns describe how to modify the contents of messages for various purposes.

9.1. Content Enricher

Overview

The content enricher pattern describes a scenario where the message destination requires more data than is present in the original message. In this case, you would use a content enricher to pull in the extra data from an external resource.

Figure 9.1. Content Enricher Pattern

Content enricher pattern

Models of content enrichment

Apache Camel supports two kinds of content enricher, as follows:
  • enrich()— obtains additional data from the resource by sending a copy of the current exchange to a producer endpoint and then using the data from the resulting reply (the exchange created by the enricher is always an InOut exchange).
  • pollEnrich()— obtains the additional data by polling a consumer endpoint for data. Effectively, the consumer endpoint from the main route and the consumer endpoint in pollEnrich() are coupled, such that exchanges incoming on the main route trigger a poll of the pollEnrich() endpoint.
Note
Content Enricher with enrich and pollEnrich pattern supports dynamic endpoint uris. You can compute uris using an expression that enables you to use values from the current exchange. For example, you can poll a file with a name that is computed from the data exchange. This change breaks the XML DSL and enables you to migrate easily. The Java DSL stays backwards compatible.

Content enrichment using enrich()

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...
The content enricher (enrich) retrieves additional data from a resource endpoint in order to enrich an incoming message (contained in the orginal exchange). An aggregation strategy combines the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange) method corresponds to the the original exchange, and the second parameter corresponds to the resource exchange. The results from the resource endpoint are stored in the resource exchange's Out message. Here is a sample template for implementing your own aggregation strategy class:
public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }
    
}
Using this template, the original exchange can have any exchange pattern. The resource exchange created by the enricher is always an InOut exchange.

Spring XML Enrich Example

The preceding example can also be implemented in Spring XML:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

Default aggregation strategy

The aggregation strategy is optional. If you do not provide it, Apache Camel will use the body obtained from the resource by default. For example:
from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");
In the preceding route, the message sent to the direct:result endpoint contains the output from the direct:resource, because this example does not use any custom aggregation.
In XML DSL, just omit the strategyRef attribute, as follows:

    &lt;route&gt;
    &lt;from uri="direct:start"/&gt;
    &lt;enrich uri="direct:resource"/&gt;
    &lt;to uri="direct:result"/&gt;
    &lt;/route&gt;

Enrich Options

The enrich DSL command supports the following options:
Name Default Value Description
uri The endpoint uri for the external service to enrich from. You must use either uri or ref.
ref Refers to the endpoint for the external service to enrich from. You must use either uri or ref.
strategyRef Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message.
aggregateOnException Refers to the Aggregate method. The aggregateOnException enables you to deal with exceptions. For example, you can suppress the exception or set a custom message.
shareUnitOfWork false Camel 2.16: Shares the unit of work with the parent and the resource exchange. However, by default, Enrich does not share the unit of work between the parent exchange and the resource exchange. Also, the resource exchange has its own individual unit of work.

Content enrich using pollEnrich

The pollEnrich command treats the resource endpoint as a consumer. Instead of sending an exchange to the resource endpoint, it polls the endpoint. By default, the poll returns immediately, if there is no exchange available from the resource endpoint. For example, the following route reads a file whose name is extracted from the header of an incoming JMS message:
from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");
And if you want to wait at most 20 seconds for the file to be ready, you can use a timeout as follows:
from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");
You can also specify an aggregation strategy for pollEnrich, as follows:
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)
Note
The resource exchange passed to the aggregation strategy's aggregate() method might be null, if the poll times out before an exchange is received.
Data from current Exchange not used
pollEnrich does not access any data from the current Exchange, so that, when polling, it cannot use any of the existing headers you may have set on the Exchange. For example, you cannot set a filename in the Exchange.FILE_NAME header and use pollEnrich to consume only that file. For that, you must set the filename in the endpoint URI.

Polling methods used by pollEnrich()

In general, the pollEnrich() enricher polls the consumer endpoint using one of the following polling methods:
  • receiveNoWait() (used by default)
  • receive()
  • receive(long timeout)
The pollEnrich() command's timeout argument (specified in milliseconds) determines which method gets called, as follows:
  • Timeout is 0 or not specified, receiveNoWait is called.
  • Timeout is negative, receive is called.
  • Otherwise, receive(timeout) is called.

pollEnrich example

In this example we enrich the message by loading the content from the file named inbox/data.txt.
 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

And in XML DSL you do:
   <route>
     <from uri="direct:start"/>
     <pollEnrich uri="file:inbox?fileName=data.txt"/>
     <to uri="direct:result"/>
   </route>
If there is no file then the message is empty. We can use a timeout to either wait (potential forever) until a file exists, or use a timeout to wait a period. For example to wait up til 5 seconds you can do:
   <route>
     <from uri="direct:start"/>
     <pollEnrich uri="file:inbox?fileName=data.txt" timeout="5000"/>
     <to uri="direct:result"/>
   </route>

PollEnrich Options

The pollEnrich DSL command supports the following options:
Name Default Value Description
uri The endpoint uri for the external service to enrich from. You must use either uri or ref.
ref Refers to the endpoint for the external service to enrich from. You must use either uri or ref.
strategyRef Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message.
timeout 0 Timeout in millis to use when polling from the external service. See below for important details about the timeout.