Red Hat Training

A Red Hat training course is available for Red Hat Fuse

1.5. Processors

Overview

To enable the router to do something more interesting than simply connecting a consumer endpoint to a producer endpoint, you can add processors to your route. A processor is a command you can insert into a routing rule to perform arbitrary processing of messages that flow through the rule. Apache Camel provides a wide variety of different processors, as shown in Table 1.1, “Apache Camel Processors”.

Table 1.1. Apache Camel Processors

Java DSLXML DSLDescription
aggregate()aggregate
Aggregator EIP: Creates an aggregator, which combines multiple incoming exchanges into a single exchange.
aop()aop
Use Aspect Oriented Programming (AOP) to do work before and after a specified sub-route. See ???.
bean(), beanRef()bean
Process the current exchange by invoking a method on a Java object (or bean). See Section 2.4, “Bean Integration”.
choice()choice
Content Based Router EIP: Selects a particular sub-route based on the exchange content, using when and otherwise clauses.
convertBodyTo()convertBodyTo
Converts the In message body to the specified type.
delay()delay
Delayer EIP: Delays the propagation of the exchange to the latter part of the route.
doTry()doTry
Creates a try/catch block for handling exceptions, using doCatch, doFinally, and end clauses.
end()N/AEnds the current command block.
enrich(),enrichRef()enrich
Content Enricher EIP: Combines the current exchange with data requested from a specified producer endpoint URI.
filter()filter
Message Filter EIP: Uses a predicate expression to filter incoming exchanges.
idempotentConsumer()idempotentConsumer
Idempotent Consumer EIP: Implements a strategy to suppress duplicate messages.
inheritErrorHandler()@inheritErrorHandlerBoolean option that can be used to disable the inherited error handler on a particular route node (defined as a sub-clause in the Java DSL and as an attribute in the XML DSL).
inOnly()inOnly
Either sets the current exchange's MEP to InOnly (if no arguments) or sends the exchange as an InOnly to the specified endpoint(s).
inOut()inOut
Either sets the current exchange's MEP to InOut (if no arguments) or sends the exchange as an InOut to the specified endpoint(s).
loadBalance()loadBalance
Load Balancer EIP: Implements load balancing over a collection of endpoints.
log()logLogs a message to the console.
loop()loop
Loop EIP: Repeatedly resends each exchange to the latter part of the route.
markRollbackOnly()@markRollbackOnly(Transactions) Marks the current transaction for rollback only (no exception is raised). In the XML DSL, this option is set as a boolean attribute on the rollback element. See "Transaction Guide".
markRollbackOnlyLast()@markRollbackOnlyLast(Transactions) If one or more transactions have previously been associated with this thread and then suspended, this command marks the latest transaction for rollback only (no exception is raised). In the XML DSL, this option is set as a boolean attribute on the rollback element. See "Transaction Guide".
marshal()marshal
Transforms into a low-level or binary format using the specified data format, in preparation for sending over a particular transport protocol.
multicast()multicast
Multicast EIP: Multicasts the current exchange to multiple destinations, where each destination gets its own copy of the exchange.
onCompletion()onCompletion
Defines a sub-route (terminated by end() in the Java DSL) that gets executed after the main route has completed. See also Section 2.11, “OnCompletion”.
onException()onException
Defines a sub-route (terminated by end() in the Java DSL) that gets executed whenever the specified exception occurs. Usually defined on its own line (not in a route).
pipeline()pipeline
Pipes and Filters EIP: Sends the exchange to a series of endpoints, where the output of one endpoint becomes the input of the next endpoint. See also Section 2.1, “Pipeline Processing”.
policy()policy
Apply a policy to the current route (currently only used for transactional policies—see "Transaction Guide").
pollEnrich(),pollEnrichRef()pollEnrich
Content Enricher EIP: Combines the current exchange with data polled from a specified consumer endpoint URI.
process(),processRefprocess
Execute a custom processor on the current exchange. See the section called “Custom processor” and Part IV, “Programming EIP Components”.
recipientList()recipientList
Recipient List EIP: Sends the exchange to a list of recipients that is calculated at runtime (for example, based on the contents of a header).
removeHeader()removeHeader
Removes the specified header from the exchange's In message.
removeHeaders()removeHeadersRemoves the headers matching the specified pattern from the exchange's In message. The pattern can have the form, prefix*—in which case it matches every name starting with prefix—otherwise, it is interpreted as a regular expression.
removeProperty()removeProperty
Removes the specified exchange property from the exchange.
removeProperties()removeProperties
Removes the properties matching the specified pattern from the exchange. Takes a comma separated list of 1 or more strings as arguments. The first string is the pattern (see removeHeaders() above). Subsequent strings specify exceptions - these properties remain.
resequence()resequence
Resequencer EIP: Re-orders incoming exchanges on the basis of a specified comparotor operation. Supports a batch mode and a stream mode.
rollback()rollback
(Transactions) Marks the current transaction for rollback only (also raising an exception, by default). See "Transaction Guide".
routingSlip()routingSlip
Routing Slip EIP: Routes the exchange through a pipeline that is constructed dynamically, based on the list of endpoint URIs extracted from a slip header.
sample()sampleCreates a sampling throttler, allowing you to extract a sample of exchanges from the traffic on a route.
setBody()setBody
Sets the message body of the exchange's In message.
setExchangePattern()setExchangePattern
Sets the current exchange's MEP to the specified value. See the section called “Message exchange patterns”.
setHeader()setHeader
Sets the specified header in the exchange's In message.
setOutHeader()setOutHeader
Sets the specified header in the exchange's Out message.
setProperty()setProperty()
Sets the specified exchange property.
sort()sort
Sorts the contents of the In message body (where a custom comparator can optionally be specified).
split()split
Splitter EIP: Splits the current exchange into a sequence of exchanges, where each split exchange contains a fragment of the original message body.
stop()stop
Stops routing the current exchange and marks it as completed.
threads()threads
Creates a thread pool for concurrent processing of the latter part of the route.
throttle()throttle
Throttler EIP: Limit the flow rate to the specified level (exchanges per second).
throwException()throwException
Throw the specified Java exception.
to()to
Send the exchange to one or more endpoints. See Section 2.1, “Pipeline Processing”.
toF()N/ASend the exchange to an endpoint, using string formatting. That is, the endpoint URI string can embed substitutions in the style of the C printf() function.
transacted()transacted
Create a Spring transaction scope that encloses the latter part of the route. See "Transaction Guide".
transform()transform
Message Translator EIP: Copy the In message headers to the Out message headers and set the Out message body to the specified value.
unmarshal()unmarshal
Transforms the In message body from a low-level or binary format to a high-level format, using the specified data format.
validate()validateTakes a predicate expression to test whether the current message is valid. If the predicate returns false, throws a PredicateValidationException exception.
wireTap()wireTap
Wire Tap EIP: Sends a copy of the current exchange to the specified wire tap URI, using the ExchangePattern.InOnly MEP.

Some sample processors

To get some idea of how to use processors in a route, see the following examples:

Choice

The choice() processor is a conditional statement that is used to route incoming messages to alternative producer endpoints. Each alternative producer endpoint is preceded by a when() method, which takes a predicate argument. If the predicate is true, the following target is selected, otherwise processing proceeds to the next when() method in the rule. For example, the following choice() processor directs incoming messages to either Target1, Target2, or Target3, depending on the values of Predicate1 and Predicate2:
from("SourceURL")
    .choice()
        .when(Predicate1).to("Target1")
        .when(Predicate2).to("Target2")
        .otherwise().to("Target3");
Or equivalently in Spring XML:
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>
In the Java DSL, there is a special case where you might need to use the endChoice() command. Some of the standard Apache Camel processors enable you to specify extra parameters using special sub-clauses, effectively opening an extra level of nesting which is usually terminated by the end() command. For example, you could specify a load balancer clause as loadBalance().roundRobin().to("mock:foo").to("mock:bar").end(), which load balances messages between the mock:foo and mock:bar endpoints. If the load balancer clause is embedded in a choice condition, however, it is necessary to terminate the clause using the endChoice() command, as follows:
from("direct:start")
    .choice()
        .when(bodyAs(String.class).contains("Camel"))
            .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice()
        .otherwise()
            .to("mock:result");

Filter

The filter() processor can be used to prevent uninteresting messages from reaching the producer endpoint. It takes a single predicate argument: if the predicate is true, the message exchange is allowed through to the producer; if the predicate is false, the message exchange is blocked. For example, the following filter blocks a message exchange, unless the incoming message contains a header, foo, with value equal to bar:
from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");
Or equivalently in Spring XML:
<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

Throttler

The throttle() processor ensures that a producer endpoint does not get overloaded. The throttler works by limiting the number of messages that can pass through per second. If the incoming messages exceed the specified rate, the throttler accumulates excess messages in a buffer and transmits them more slowly to the producer endpoint. For example, to limit the rate of throughput to 100 messages per second, you can define the following rule:
from("SourceURL").throttle(100).to("TargetURL");
Or equivalently in Spring XML:
<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttle>
  </route>
</camelContext>

Custom processor

If none of the standard processors described here provide the functionality you need, you can always define your own custom processor. To create a custom processor, define a class that implements the org.apache.camel.Processor interface and overrides the process() method. The following custom processor, MyProcessor, removes the header named foo from incoming messages:

Example 1.3. Implementing a Custom Processor Class

public class MyProcessor implements org.apache.camel.Processor {
public void process(org.apache.camel.Exchange exchange) {
  inMessage = exchange.getIn();
  if (inMessage != null) {
      inMessage.removeHeader("foo");
  }
}
};
To insert the custom processor into a router rule, invoke the process() method, which provides a generic mechanism for inserting processors into rules. For example, the following rule invokes the processor defined in Example 1.3, “Implementing a Custom Processor Class”:
org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");