Chapter 35. Implementing a Processor

Abstract

Apache Camel allows you to implement a custom processor. You can then insert the custom processor into a route to perform operations on exchange objects as they pass through the route.

35.1. Processing Model

Pipelining model

The pipelining model describes the way in which processors are arranged in Section 5.4, “Pipes and Filters”. Pipelining is the most common way to process a sequence of endpoints (a producer endpoint is just a special type of processor). When the processors are arranged in this way, the exchange’s In and Out messages are processed as shown in Figure 35.1, “Pipelining Model”.

Figure 35.1. Pipelining Model

Pipelining model

The processors in the pipeline look like services, where the In message is analogous to a request, and the Out message is analogous to a reply. In fact, in a realistic pipeline, the nodes in the pipeline are often implemented by Web service endpoints, such as the CXF component.

For example, Example 35.1, “Java DSL Pipeline” shows a Java DSL pipeline constructed from a sequence of two processors, ProcessorA, ProcessorB, and a producer endpoint, TargetURI.

Example 35.1. Java DSL Pipeline

from(SourceURI).pipeline(ProcessorA, ProcessorB, TargetURI);

35.2. Implementing a Simple Processor

Overview

This section describes how to implement a simple processor that executes message processing logic before delegating the exchange to the next processor in the route.

Processor interface

Simple processors are created by implementing the org.apache.camel.Processor interface. As shown in Example 35.2, “Processor Interface”, the interface defines a single method, process(), which processes an exchange object.

Example 35.2. Processor Interface

package org.apache.camel;

public interface Processor {
    void process(Exchange exchange) throws Exception;
}

Implementing the Processor interface

To create a simple processor you must implement the Processor interface and provide the logic for the process() method. Example 35.3, “Simple Processor Implementation” shows the outline of a simple processor implementation.

Example 35.3. Simple Processor Implementation

import org.apache.camel.Processor;

public class MyProcessor implements Processor {
    public MyProcessor() { }

    public void process(Exchange exchange) throws Exception
    {
        // Insert code that gets executed *before* delegating
        // to the next processor in the chain.
        ...
    }
}

All of the code in the process() method gets executed before the exchange object is delegated to the next processor in the chain.

For examples of how to access the message body and header values inside a simple processor, see Section 35.3, “Accessing Message Content”.

Inserting the simple processor into a route

Use the process() DSL command to insert a simple processor into a route. Create an instance of your custom processor and then pass this instance as an argument to the process() method, as follows:

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");

35.3. Accessing Message Content

Accessing message headers

Message headers typically contain the most useful message content from the perspective of a router, because headers are often intended to be processed in a router service. To access header data, you must first get the message from the exchange object (for example, using Exchange.getIn()), and then use the Message interface to retrieve the individual headers (for example, using Message.getHeader()).

Example 35.4, “Accessing an Authorization Header” shows an example of a custom processor that accesses the value of a header named Authorization. This example uses the ExchangeHelper.getMandatoryHeader() method, which eliminates the need to test for a null header value.

Example 35.4. Accessing an Authorization Header

import org.apache.camel.*;
import org.apache.camel.util.ExchangeHelper;

public class MyProcessor implements Processor {
  public void process(Exchange exchange) {
    String auth = ExchangeHelper.getMandatoryHeader(
                      exchange,
                      "Authorization",
                      String.class
                  );
    // process the authorization string...
    // ...
  }
}

For full details of the Message interface, see Section 34.2, “Messages”.

Accessing the message body

You can also access the message body. For example, to append a string to the end of the In message, you can use the processor shown in Example 35.5, “Accessing the Message Body”.

Example 35.5. Accessing the Message Body

import org.apache.camel.*;
import org.apache.camel.util.ExchangeHelper;

public class MyProcessor implements Processor {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}

Accessing message attachments

You can access a message’s attachments using either the Message.getAttachment() method or the Message.getAttachments() method. See Example 34.2, “Message Interface” for more details.

35.4. The ExchangeHelper Class

Overview

The org.apache.camel.util.ExchangeHelper class is a Apache Camel utility class that provides methods that are useful when implementing a processor.

Resolve an endpoint

The static resolveEndpoint() method is one of the most useful methods in the ExchangeHelper class. You use it inside a processor to create new Endpoint instances on the fly.

Example 35.6. The resolveEndpoint() Method

public final class ExchangeHelper {
    ...
    @SuppressWarnings({"unchecked" })
    public static Endpoint
    resolveEndpoint(Exchange exchange, Object value)
        throws NoSuchEndpointException { ... }
    ...
}

The first argument to resolveEndpoint() is an exchange instance, and the second argument is usually an endpoint URI string. Example 35.7, “Creating a File Endpoint” shows how to create a new file endpoint from an exchange instance exchange

Example 35.7. Creating a File Endpoint

Endpoint file_endp = ExchangeHelper.resolveEndpoint(exchange, "file://tmp/messages/in.xml");

Wrapping the exchange accessors

The ExchangeHelper class provides several static methods of the form getMandatoryBeanProperty(), which wrap the corresponding getBeanProperty() methods on the Exchange class. The difference between them is that the original getBeanProperty() accessors return null, if the corresponding property is unavailable, and the getMandatoryBeanProperty() wrapper methods throw a Java exception. The following wrapper methods are implemented in the ExchangeHelper class:

public final class ExchangeHelper {
    ...
    public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type)
        throws NoSuchPropertyException { ... }

    public static <T> T getMandatoryHeader(Exchange exchange, String propertyName, Class<T> type)
        throws NoSuchHeaderException { ... }

    public static Object getMandatoryInBody(Exchange exchange)
        throws InvalidPayloadException { ... }

    public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type)
        throws InvalidPayloadException { ... }

    public static Object getMandatoryOutBody(Exchange exchange)
        throws InvalidPayloadException { ... }

    public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type)
        throws InvalidPayloadException { ... }
    ...
}

Testing the exchange pattern

Several different exchange patterns are compatible with holding an In message. Several different exchange patterns are also compatible with holding an Out message. To provide a quick way of checking whether or not an exchange object is capable of holding an In message or an Out message, the ExchangeHelper class provides the following methods:

public final class ExchangeHelper {
    ...
    public static boolean isInCapable(Exchange exchange) { ... }

    public static boolean isOutCapable(Exchange exchange) { ... }
    ...
}

Get the In message’s MIME content type

If you want to find out the MIME content type of the exchange’s In message, you can access it by calling the ExchangeHelper.getContentType(exchange) method. To implement this, the ExchangeHelper object looks up the value of the In message’s Content-Type header — this method relies on the underlying component to populate the header value).