Fuse ESB

Programing EIP Components

Version 4.4.1

Sept. 2011
Trademark Disclaimer
Third Party Acknowledgements

Updated: 06 Jun 2013

Table of Contents

1. Understanding Message Formats
Exchanges
Messages
Built-In Type Converters
Built-In UUID Generators
2. Implementing a Processor
Processing Model
Implementing a Simple Processor
Accessing Message Content
The ExchangeHelper Class
3. Type Converters
Type Converter Architecture
Implementing Type Converter Using Annotations
Implementing a Type Converter Directly
4. Producer and Consumer Templates
Using the Producer Template
Introduction to the Producer Template
Synchronous Send
Synchronous Request with InOut Pattern
Asynchronous Send
Asynchronous Request with InOut Pattern
Asynchronous Send with Callback
Using the Consumer Template
5. Implementing a Component
Component Architecture
Factory Patterns for a Component
Using a Component in a Route
Consumer Patterns and Threading
Asynchronous Processing
How to Implement a Component
Auto-Discovery and Configuration
Setting Up Auto-Discovery
Configuring a Component
6. Component Interface
The Component Interface
Implementing the Component Interface
7. Endpoint Interface
The Endpoint Interface
Implementing the Endpoint Interface
8. Consumer Interface
The Consumer Interface
Implementing the Consumer Interface
9. Producer Interface
The Producer Interface
Implementing the Producer Interface
10. Exchange Interface
The Exchange Interface
11. Message Interface
The Message Interface
Implementing the Message Interface
Index

List of Figures

1.1. Exchange Object Passing through a Route
2.1. Pipelining Model
3.1. Type Conversion Process
5.1. Component Factory Patterns
5.2. Consumer and Producer Instances in a Route
5.3. Event-Driven Consumer
5.4. Scheduled Poll Consumer
5.5. Polling Consumer
5.6. Synchronous Producer
5.7. Asynchronous Producer
6.1. Component Inheritance Hierarchy
7.1. Endpoint Inheritance Hierarchy
8.1. Consumer Inheritance Hierarchy
9.1. Producer Inheritance Hierarchy
10.1. Exchange Inheritance Hierarchy
11.1. Message Inheritance Hierarchy

List of Tables

8.1. Scheduled Poll Parameters

List of Examples

1.1. Exchange Methods
1.2. Message Interface
1.3. Unmarshalling a Java Object
1.4. Converting a Value to a String
1.5. UuidGenerator Interface
2.1. Java DSL Pipeline
2.2. Processor Interface
2.3. Simple Processor Implementation
2.4. Accessing an Authorization Header
2.5. Accessing the Message Body
2.6. The resolveEndpoint() Method
2.7. Creating a File Endpoint
3.1. TypeConverter Interface
3.2. Getting a Master Type Converter
3.3. Example of an Annotated Converter Class
5.1. Configuring a Component in Spring
5.2. JMS Component Spring Configuration
6.1. Component Interface
6.2. Implementation of createEndpoint()
6.3. FileComponent Implementation
7.1. Endpoint Interface
7.2. Implementing DefaultEndpoint
7.3. ScheduledPollEndpoint Implementation
7.4. DefaultPollingEndpoint Implementation
7.5. BrowsableEndpoint Interface
7.6. SedaEndpoint Implementation
8.1. FileEndpoint createConsumer() Implementation
8.2. ShutdownAware Interface
8.3. JMXConsumer Implementation
8.4. ScheduledPollConsumer Implementation
8.5. PollingConsumerSupport Implementation
8.6. Custom Threading Implementation
9.1. Producer Interface
9.2. AsyncProcessor Interface
9.3. AsyncCallback Interface
9.4. DefaultProducer Implementation
9.5. CollectionProducer Implementation
10.1. Exchange Interface
11.1. Message Interface
11.2. Custom Message Implementation

Message objects represent messages using the following abstract model:

The message body and the message headers can be of arbitrary type (they are declared as type Object) and the message attachments are declared to be of type javax.activation.DataHandler , which can contain arbitrary MIME types. If you need to obtain a concrete representation of the message contents, you can convert the body and headers to another type using the type converter mechanism and, possibly, using the marshalling and unmarshalling mechanism.

One important feature of Fuse Mediation Router messages is that they support lazy creation of message bodies and headers. In some cases, this means that a message can pass through a route without needing to be parsed at all.

Fuse Mediation Router supports lazy creation of bodies, headers, and attachments. This means that the objects that represent a message body, a message header, or a message attachment are not created until they are needed.

For example, consider the following route that accesses the foo message header from the In message:

from("SourceURL")
    .filter(header("foo")
    .isEqualTo("bar"))
    .to("TargetURL");

In this route, if we assume that the component referenced by SourceURL supports lazy creation, the In message headers are not actually parsed until the header("foo") call is executed. At that point, the underlying message implementation parses the headers and populates the header map. The message body is not parsed until you reach the end of the route, at the to("TargetURL") call. At that point, the body is converted into the format required for writing it to the target endpoint, TargetURL.

By waiting until the last possible moment before populating the bodies, headers, and attachments, you can ensure that unnecessary type conversions are avoided. In some cases, you can completely avoid parsing. For example, if a route contains no explicit references to message headers, a message could traverse the route without ever parsing the headers.

Whether or not lazy creation is implemented in practice depends on the underlying component implementation. In general, lazy creation is valuable for those cases where creating a message body, a message header, or a message attachment is expensive. For details about implementing a message type that supports lazy creation, see Implementing the Message Interface.

It does not matter what the initial format of the message is, because you can easily convert a message from one format to another using the built-in type converters (see Built-In Type Converters). There are various methods in the Fuse Mediation Router API that expose type conversion functionality. For example, the convertBodyTo(Class type) method can be inserted into a route to convert the body of an In message, as follows:

from("SourceURL").convertBodyTo(String.class).to("TargetURL");

Where the body of the In message is converted to a java.lang.String. The following example shows how to append a string to the end of the In message body:

from("SourceURL").setBody(bodyAs(String.class).append("My Special Signature")).to("TargetURL");

Where the message body is converted to a string format before appending a string to the end. It is not necessary to convert the message body explicitly in this example. You can also use:

from("SourceURL").setBody(body().append("My Special Signature")).to("TargetURL");

Where the append() method automatically converts the message body to a string before appending its argument.

You can also perform conversions to the javax.xml.transform.sax.SAXSource type, which supports the SAX event-driven XML parser (see the SAX Web site for details). You can convert to SAXSource from the following types:

  • String

  • InputStream

  • Source

  • StreamSource

  • DOMSource



[1] If there is no active method the returned value will be null.

The pipelining model describes the way in which processors are arranged in Pipes and Filters in Implementing Enterprise Integration Patterns. Pipelining is the most common way to process a sequence of endpoints (a producer endpoint is just a special type of processor). When the processors are arranged in this way, the exchange's In and Out messages are processed as shown in Figure 2.1.


The processors in the pipeline look like services, where the In message is analogous to a request, and the Out message is analogous to a reply. In fact, in a realistic pipeline, the nodes in the pipeline are often implemented by Web service endpoints, such as the CXF component.

For example, Example 2.1 shows a Java DSL pipeline constructed from a sequence of two processors, ProcessorA, ProcessorB, and a producer endpoint, TargetURI.


The org.apache.camel.util.ExchangeHelper class is a Fuse Mediation Router utility class that provides methods that are useful when implementing a processor.

Figure 3.1 gives an overview of the type conversion process, showing the steps involved in converting a given data value, value, to a specified type, toType.


The type conversion mechanism proceeds as follows:

  1. The CamelContext object holds a reference to the master TypeConverter instance. The first step in the conversion process is to retrieve the master type converter by calling CamelContext.getTypeConverter().

  2. Type conversion is initiated by calling the convertTo() method on the master type converter. This method instructs the type converter to convert the data object, value, from its original type to the type specified by the toType argument.

  3. Because the master type converter is a front end for many different slave type converters, it looks up the appropriate slave type converter by checking a registry of type mappings The registry of type converters is keyed by a type mapping pair (toType, fromType). If a suitable type converter is found in the registry, the master type converter calls the slave's convertTo() method and returns the result.

  4. If a suitable type converter cannot be found in the registry, the master type converter loads a new type converter, using the type converter loader.

  5. The type converter loader searches the available JAR libraries on the classpath to find a suitable type converter. Currently, the loader strategy that is used is implemented by the annotation type converter loader, which attempts to load a class annotated by the org.apache.camel.Converter annotation. See Create a TypeConverter file.

  6. If the type converter loader is successful, a new slave type converter is loaded and entered into the type converter registry. This type converter is then used to convert the value argument to the toType type.

  7. If the data is successfully converted, the converted data value is returned. If the conversion does not succeed, null is returned.

You can implement a custom type converter class using the @Converter annotation. You must annotate the class itself and each of the static methods intended to perform type conversion. Each converter method takes an argument that defines the from type, optionally takes a second Exchange argument, and has a non-void return value that defines the to type. The type converter loader uses Java reflection to find the annotated methods and integrate them into the type converter mechanism. Example 3.3 shows an example of an annotated converter class that defines a converter method for converting from java.io.File to java.io.InputStream and another converter method (with an Exchange argument) for converting from byte[] to String.


The toInputStream() method is responsible for performing the conversion from the File type to the InputStream type and the toString() method is responsible for performing the conversion from the byte[] type to the String type.

Note

The method name is unimportant, and can be anything you choose. What is important are the argument type, the return type, and the presence of the @Converter annotation.

In addition to defining regular converter methods using the @Converter annotation, you can optionally define a fallback converter method using the @FallbackConverter annotation. The fallback converter method will only be tried, if the master type converter fails to find a regular converter method in the type registry.

The essential difference between a regular converter method and a fallback converter method is that whereas a regular converter is defined to perform conversion between a specific pair of types (for example, from byte[] to String), a fallback converter can potentially perform conversion between any pair of types. It is up to the code in the body of the fallback converter method to figure out which conversions it is able to perform. At run time, if a conversion cannot be performed by a regular converter, the master type converter iterates through every available fallback converter until it finds one that can perform the conversion.

The method signature of a fallback converter can have either of the following forms:

// 1. Non-generic form of signature
@FallbackConverter
public static Object MethodName(
    Class type,
    Exchange exchange,
    Object value,
    TypeConverterRegistry registry
)

// 2. Templating form of signature
@FallbackConverter
public static <T> T MethodName(
    Class<T> type,
    Exchange exchange,
    Object value,
    TypeConverterRegistry registry
)

Where MethodName is an arbitrary method name for the fallback converter.

For example, the following code extract (taken from the implementation of the File component) shows a fallback converter that can convert the body of a GenericFile object, exploiting the type converters already available in the type converter registry:

package org.apache.camel.component.file;

import org.apache.camel.Converter;
import org.apache.camel.FallbackConverter;
import org.apache.camel.Exchange;
import org.apache.camel.TypeConverter;
import org.apache.camel.spi.TypeConverterRegistry;

@Converter
public final class GenericFileConverter {

    private GenericFileConverter() {
        // Helper Class
    }

    @FallbackConverter
    public static <T> T convertTo(Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry) {
        // use a fallback type converter so we can convert the embedded body if the value is GenericFile
        if (GenericFile.class.isAssignableFrom(value.getClass())) {
            GenericFile file = (GenericFile) value;
            Class from = file.getBody().getClass();
            TypeConverter tc = registry.lookup(type, from);
            if (tc != null) {
                Object body = file.getBody();
                return tc.convertTo(type, exchange, body);
            }
        }
        
        return null;
    }
    ...
}

Generally, the recommended way to implement a type converter is to use an annotated class, as described in the previous section, Implementing Type Converter Using Annotations. But if you want to have complete control over the registration of your type converter, you can implement a custom slave type converter and add it directly to the type converter registry, as described here.

The methods for invoking endpoints asynchronously have names of the form asyncSendSuffix() and asyncRequestSuffix(). For example, the methods for invoking an endpoint using either the default message exchange pattern (MEP) or an explicitly specified MEP are named asyncSend() and asyncSendBody() (where these methods respectively send an Exchange object or a message body). If you want to force the MEP to be InOut (request/reply semantics), you can call the asyncRequestBody(), asyncRequestBodyAndHeader(), and asyncRequestBodyAndHeaders() methods instead.

The following example shows how to send an exchange asynchronously to the direct:start endpoint. The asyncSend() method returns a java.util.concurrent.Future object, which is used to retrieve the invocation result at a later time.

import java.util.concurrent.Future;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
...
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncSend("direct:start", exchange);

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the resulting exchange from the Future
Exchange result = future.get();

The producer template also provides methods to send a message body asynchronously (for example, using asyncSendBody() or asyncRequestBody()). In this case, you can use one of the following helper methods to extract the returned message body from the Future object:

<T> T extractFutureBody(Future future, Class<T> type);
<T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;

The first version of the extractFutureBody() method blocks until the invocation completes and the reply message is available. The second version of the extractFutureBody() method allows you to specify a timeout. Both methods have a type argument, type, which casts the returned message body to the specified type using a built-in type converter.

The following example shows how to use the asyncRequestBody() method to send a message body to the direct:start endpoint. The blocking extractFutureBody() method is then used to retrieve the reply message body from the Future object.

Future<Object> future = template.asyncRequestBody("direct:start", "Hello");

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the reply message body as a String type
String result = template.extractFutureBody(future, String.class);

In the preceding asynchronous examples, the request message is dispatched in a sub-thread, while the reply is retrieved and processed by the main thread. The producer template also gives you the option, however, of processing replies in the sub-thread, using one of the asyncCallback(), asyncCallbackSendBody(), or asyncCallbackRequestBody() methods. In this case, you supply a callback object (of org.apache.camel.impl.SynchronizationAdapter type), which automatically gets invoked in the sub-thread as soon as a reply message arrives.

The Synchronization callback interface is defined as follows:

package org.apache.camel.spi;

import org.apache.camel.Exchange;

public interface Synchronization {
    void onComplete(Exchange exchange);
    void onFailure(Exchange exchange);
}

Where the onComplete() method is called on receipt of a normal reply and the onFailure() method is called on receipt of a fault message reply. Only one of these methods gets called back, so you must override both of them to ensure that all types of reply are processed.

The following example shows how to send an exchange to the direct:start endpoint, where the reply message is processed in the sub-thread by the SynchronizationAdapter callback object.

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SynchronizationAdapter;
...
Exchange exchange = context.getEndpoint("direct:start").createExchange();
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() {
    @Override
    public void onComplete(Exchange exchange) {
        assertEquals("Hello World", exchange.getIn().getBody());
    }
});

Where the SynchronizationAdapter class is a default implementation of the Synchronization interface, which you can override to provide your own definitions of the onComplete() and onFailure() callback methods.

You still have the option of accessing the reply from the main thread, because the asyncCallback() method also returns a Future object—for example:

// Retrieve the reply from the main thread, specifying a timeout
Exchange reply = future.get(10, TimeUnit.SECONDS);

A simple variation of the general send() method is to use a processor to populate a default exchange, instead of supplying the exchange object explicitly (see Synchronous invocation with a processor for details).

The send() methods for sending an exchange populated by a processor let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object. In addition, you can optionally specify the exchange's MEP by supplying the pattern argument, instead of accepting the default.

Exchange send(Processor processor);
Exchange send(String endpointUri, Processor processor);
Exchange send(Endpoint endpoint, Processor processor);
Exchange send(
    String endpointUri,
    ExchangePattern pattern,
    Processor processor
);
Exchange send(
    Endpoint endpoint,
    ExchangePattern pattern,
    Processor processor
);

For testing purposes, it is often interesting to try out the effect of a single header setting and the sendBodyAndHeader() methods are useful for this kind of header testing. You supply the message body and header setting as arguments to sendBodyAndHeader() and let the producer template take care of inserting the body and header setting into a default exchange object.

The sendBodyAndHeader() methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object. In addition, you can optionally specify the exchange's MEP by supplying the pattern argument, instead of accepting the default. The methods without a pattern argument return void (even though the invocation might give rise to a reply in some cases); and the methods with a pattern argument return either the body of the Out message (if there is one) or the body of the In message (otherwise).

void sendBodyAndHeader(
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);

The sendBodyAndHeaders() methods are similar to the sendBodyAndHeader() methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.

void sendBodyAndHeaders(
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);

You can try out the effect of setting a single header value using the requestBodyAndHeader() methods. You supply the message body and header setting as arguments to requestBodyAndHeader() and let the producer template take care of inserting the body and exchange property into a default exchange object.

The requestBodyAndHeader() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object. The return value is the body of the reply message (Out message body), which can either be returned as plain Object or converted to a specific type, T, using the built-in type converters (see Built-In Type Converters).

Object requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Object requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

The requestBodyAndHeaders() methods are similar to the requestBodyAndHeader() methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.

Object requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Object requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

The basic asyncSend() method takes an Exchange argument and invokes an endpoint asynchronously, using the message exchange pattern (MEP) of the specified exchange. The return value is a java.util.concurrent.Future object, which is a ticket you can use to collect the reply message at a later time—for details of how to obtain the return value from the Future object, see Asynchronous invocation.

The following asyncSend() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.

Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);

You can try out the effect of setting a single header value using the asyncRequestBodyAndHeader() methods. You supply the message body and header setting as arguments to asyncRequestBodyAndHeader() and let the producer template take care of inserting the body and exchange property into a default exchange object.

The asyncRequestBodyAndHeader() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object. The return value that is retrievable from the Future object is the body of the reply message (Out message body), which can be returned either as a plain Object or converted to a specific type, T, using a built-in type converter (see Asynchronous invocation).

Future<Object> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

The asyncRequestBodyAndHeaders() methods are similar to the asyncRequestBodyAndHeader() methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.

Future<Object> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

The basic asyncCallback() method takes an Exchange argument and invokes an endpoint asynchronously, using the message exchange pattern (MEP) of the specified exchange. This method is similar to the asyncSend() method for exchanges, except that it takes an additional org.apache.camel.spi.Synchronization argument, which is a callback interface with two methods: onComplete() and onFailure(). For details of how to use the Synchronization callback, see Asynchronous invocation with a callback.

The following asyncCallback() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.

Future<Exchange> asyncCallback(
    String endpointUri,
    Exchange exchange,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Exchange exchange,
    Synchronization onCompletion
);

In the event-driven pattern, the processing of an incoming request is initiated when another part of the application (typically a third-party library) calls a method implemented by the consumer. A good example of an event-driven consumer is the Fuse Mediation Router JMX component, where events are initiated by the JMX library. The JMX library calls the handleNotification() method to initiate request processing—see Example 8.3 for details.

Figure 5.3 shows an outline of the event-driven consumer pattern. In this example, it is assumed that processing is triggered by a call to the notify() method.


The event-driven consumer processes incoming requests as follows:

  1. The consumer must implement a method to receive the incoming event (in Figure 5.3 this is represented by the notify() method). The thread that calls notify() is normally a separate part of the application, so the consumer's threading policy is externally driven.

    For example, in the case of the JMX consumer implementation, the consumer implements the NotificationListener.handleNotification() method to receive notifications from JMX. The threads that drive the consumer processing are created within the JMX layer.

  2. In the body of the notify() method, the consumer first converts the incoming event into an exchange object, E, and then calls process() on the next processor in the route, passing the exchange object as its argument.

Figure 5.7 shows an outline of an asynchronous producer, where the producer processes the exchange in a sub-thread, and the preceding processor is not blocked for any significant length of time.


The synchronous producer processes an exchange as follows:

  1. Before the processor can call the asynchronous process() method, it must create an asynchronous callback object, which is responsible for processing the exchange on the return portion of the route. For the asynchronous callback, the processor must implement a class that inherits from the AsyncCallback interface.

  2. The processor calls the asynchronous process() method on the producer to initiate asynchronous processing. The asynchronous process() method takes two arguments:

    • an exchange object

    • a synchronous callback object

  3. In the body of the process() method, the producer creates a Runnable object that encapsulates the processing code. The producer then delegates the execution of this Runnable object to a sub-thread.

  4. The asynchronous process() method returns, thereby freeing up the processor's thread. The exchange processing continues in a separate sub-thread.

  5. The Runnable object sends the In message to the endpoint.

  6. If required by the exchange pattern, the Runnable object waits for the reply (Out or Fault message) to arrive from the endpoint. The Runnable object remains blocked until the reply is received.

  7. After the reply arrives, the Runnable object inserts the reply (Out message) into the exchange object and then calls done() on the asynchronous callback object. The asynchronous callback is then responsible for processing the reply message (executed in the sub-thread).

You typically implement a custom component as follows:

  1. Implement the Component interface—A component object acts as an endpoint factory. You extend the DefaultComponent class and implement the createEndpoint() method.

    See Component Interface.

  2. Implement the Endpoint interface—An endpoint represents a resource identified by a specific URI. The approach taken when implementing an endpoint depends on whether the consumers follow an event-driven pattern, a scheduled poll pattern, or a polling pattern.

    For an event-driven pattern, implement the endpoint by extending the DefaultEndpoint class and implementing the following methods:

    • createProducer()

    • createConsumer()

    For a scheduled poll pattern, implement the endpoint by extending the ScheduledPollEndpoint class and implementing the following methods:

    • createProducer()

    • createConsumer()

    For a polling pattern, implement the endpoint by extending the DefaultPollingEndpoint class and implementing the following methods:

    • createProducer()

    • createPollConsumer()

    See Endpoint Interface.

  3. Implement the Consumer interface—There are several different approaches you can take to implementing a consumer, depending on which pattern you need to implement (event-driven, scheduled poll, or polling). The consumer implementation is also crucially important for determining the threading model used for processing a message exchange.

    See Implementing the Consumer Interface.

  4. Implement the Producer interface—To implement a producer, you extend the DefaultProducer class and implement the process() method.

    See Producer Interface.

  5. Optionally implement the Exchange or the Message interface—The default implementations of Exchange and Message can be used directly, but occasionally, you might find it necessary to customize these types.

    See Exchange Interface and Message Interface.

Example 5.2 shows an example of how to configure the Fuse Mediation Router's JMS component by defining a bean element with ID equal to jms. These settings are added to the Spring configuration file, camel-context.xml.


1

The CamelContext automatically instantiates any RouteBuilder classes that it finds in the specified Java package, org.apache.camel.example.spring.

2

The bean element with ID, jms, configures the JMS component. The bean ID corresponds to the component's URI prefix. For example, if a route specifies an endpoint with the URI, jms://MyQName, Fuse Mediation Router automatically loads the JMS component using the settings from the jms bean element.

3

JMS is just a wrapper for a messaging service. You must specify the concrete implementation of the messaging system by setting the connectionFactory property on the JmsComponent class.

4

In this example, the concrete implementation of the JMS messaging service is Apache ActiveMQ. The brokerURL property initializes a connection to an ActiveMQ broker instance, where the message broker is embedded in the local Java virtual machine (JVM). If a broker is not already present in the JVM, ActiveMQ will instantiate it with the options broker.persistent=false (the broker does not persist messages) and broker.useJmx=false (the broker does not open a JMX port).



[2] The timeout interval is typically specified in milliseconds.

Example 6.2 outlines how to implement the DefaultComponent.createEndpoint() method, which is responsible for creating endpoint instances on demand.

Example 6.2. Implementation of createEndpoint()

public class CustomComponent extends DefaultComponent { 1
    ...
    protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { 2
        CustomEndpoint result = new CustomEndpoint(uri, this); 3
        // ...
        return result;
    }
}

1

The CustomComponent is the name of your custom component class, which is defined by extending the DefaultComponent class.

2

When extending DefaultComponent, you must implement the createEndpoint() method with three arguments (see URI parsing).

3

Create an instance of your custom endpoint type, CustomEndpoint, by calling its constructor. At a minimum, this constructor takes a copy of the original URI string, uri, and a reference to this component instance, this.

Example 6.3 shows a sample implementation of a FileComponent class.

Example 6.3. FileComponent Implementation

package org.apache.camel.component.file;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;

import java.io.File;
import java.util.Map;

public class FileComponent extends DefaultComponent {
    public static final String HEADER_FILE_NAME = "org.apache.camel.file.name";

    public FileComponent() { 1
    }

    public FileComponent(CamelContext context) { 2
        super(context);
    }

    protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { 3
        File file = new File(remaining);
        FileEndpoint result = new FileEndpoint(file, uri, this);
        return result;
    }
}

1

Always define a no-argument constructor for the component class in order to facilitate automatic instantiation of the class.

2

A constructor that takes the parent CamelContext instance as an argument is convenient when creating a component instance by programming.

3

The implementation of the FileComponent.createEndpoint() method follows the pattern described in Example 6.2. The implementation creates a FileEndpoint object.

The Endpoint interface defines the following methods:

If your custom endpoint conforms to the event-driven pattern (see Consumer Patterns and Threading), it is implemented by extending the abstract class, org.apache.camel.impl.DefaultEndpoint, as shown in Example 7.2.

Example 7.2. Implementing DefaultEndpoint

import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;

public class CustomEndpoint extends DefaultEndpoint { 1

    public CustomEndpoint(String endpointUri, Component component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        return new CustomProducer(this);
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        return new CustomConsumer(this, processor);
    }

    public boolean isSingleton() { 
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 5
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}

1

Implement an event-driven custom endpoint, CustomEndpoint, by extending the DefaultEndpoint class.

2

You must have at least one constructor that takes the endpoint URI, endpointUri, and the parent component reference, component, as arguments.

3

Implement the createProducer() factory method to create producer endpoints.

4

Implement the createConsumer() factory method to create event-driven consumer instances.

Important

Do not override the createPollingConsumer() method.

5

In general, it is not necessary to override the createExchange() methods. The implementations inherited from DefaultEndpoint create a DefaultExchange object by default, which can be used in any Apache Camel component. If you need to initialize some exchange properties in the DefaultExchange object, however, it is appropriate to override the createExchange() methods here in order to add the exchange property settings.

The DefaultEndpoint class provides default implementations of the following methods, which you might find useful when writing your custom endpoint code:

If your custom endpoint conforms to the scheduled poll pattern (see Consumer Patterns and Threading) it is implemented by inheriting from the abstract class, org.apache.camel.impl.ScheduledPollEndpoint, as shown in Example 7.3.

Example 7.3. ScheduledPollEndpoint Implementation

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.ScheduledPollEndpoint;

public class CustomEndpoint extends ScheduledPollEndpoint { 1

    protected CustomEndpoint(String endpointUri, CustomComponent component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        Producer result = new CustomProducer(this);
        return result;
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        Consumer result = new CustomConsumer(this, processor);
        configureConsumer(result); 5
        return result;
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 6
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}

1

Implement a scheduled poll custom endpoint, CustomEndpoint, by extending the ScheduledPollEndpoint class.

2

You must to have at least one constructor that takes the endpoint URI, endpointUri, and the parent component reference, component, as arguments.

3

Implement the createProducer() factory method to create a producer endpoint.

4

Implement the createConsumer() factory method to create a scheduled poll consumer instance.

Important

Do not override the createPollingConsumer() method.

5

The configureConsumer() method, defined in the ScheduledPollEndpoint base class, is responsible for injecting consumer query options into the consumer. See Consumer parameter injection.

6

In general, it is not necessary to override the createExchange() methods. The implementations inherited from DefaultEndpoint create a DefaultExchange object by default, which can be used in any Apache Camel component. If you need to initialize some exchange properties in the DefaultExchange object, however, it is appropriate to override the createExchange() methods here in order to add the exchange property settings.

Example 7.6 shows a sample implementation of SedaEndpoint. The SEDA endpoint is an example of an event-driven endpoint. Incoming events are stored in a FIFO queue (an instance of java.util.concurrent.BlockingQueue) and a SEDA consumer starts up a thread to read and process the events. The events themselves are represented by org.apache.camel.Exchange objects.

Example 7.6. SedaEndpoint Implementation

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;

public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 1
    private BlockingQueue<Exchange> queue;

    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 2
        super(endpointUri, component);
        this.queue = queue;
    }

    public SedaEndpoint(String uri, SedaComponent component, Map parameters) { 3
        this(uri, component, component.createQueue(uri, parameters));
    }

    public Producer createProducer() throws Exception { 4
        return new CollectionProducer(this, getQueue());
    }

    public Consumer createConsumer(Processor processor) throws Exception { 5
        return new SedaConsumer(this, processor);
    }

    public BlockingQueue<Exchange> getQueue() { 6
        return queue;
    }

    public boolean isSingleton() { 7
        return true;
    }

    public List<Exchange> getExchanges() { 8
        return new ArrayList<Exchange>(getQueue());
    }
}

1

The SedaEndpoint class follows the pattern for implementing an event-driven endpoint by extending the DefaultEndpoint class. The SedaEndpoint class also implements the BrowsableEndpoint interface, which provides access to the list of exchange objects in the queue.

2

Following the usual pattern for an event-driven consumer, SedaEndpoint defines a constructor that takes an endpoint argument, endpointUri, and a component reference argument, component.

3

Another constructor is provided, which delegates queue creation to the parent component instance.

4

The createProducer() factory method creates an instance of CollectionProducer, which is a producer implementation that adds events to the queue.

5

The createConsumer() factory method creates an instance of SedaConsumer, which is responsible for pulling events off the queue and processing them.

6

The getQueue() method returns a reference to the queue.

7

The isSingleton() method returns true, indicating that a single endpoint instance should be created for each unique URI string.

8

The getExchanges() method implements the corresponding abstract method from BrowsableEndpoint.

For consumers that follow the scheduled poll pattern (see Scheduled poll pattern), Fuse Mediation Router provides support for injecting parameters into consumer instances. For example, consider the following endpoint URI for a component identified by the custom prefix:

custom:destination?consumer.myConsumerParam

Fuse Mediation Router provides support for automatically injecting query options of the form consumer.*. For the consumer.myConsumerParam parameter, you need to define corresponding setter and getter methods on the Consumer implementation class as follows:

public class CustomConsumer extends ScheduledPollConsumer {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}

Where the getter and setter methods follow the usual Java bean conventions (including capitalizing the first letter of the property name).

In addition to defining the bean methods in your Consumer implementation, you must also remember to call the configureConsumer() method in the implementation of Endpoint.createConsumer(). See Scheduled poll endpoint implementation). Example 8.1 shows an example of a createConsumer() method implementation, taken from the FileEndpoint class in the file component:


At run time, consumer parameter injection works as follows:

Fuse Mediation Router provides two special consumer implementations which can be used to convert back and forth between an event-driven consumer and a polling consumer. The following conversion classes are provided:

In practice, these classes are used to simplify the task of implementing an Endpoint type. The Endpoint interface defines the following two methods for creating a consumer instance:

package org.apache.camel;

public interface Endpoint {
    ...
    Consumer createConsumer(Processor processor) throws Exception;
    PollingConsumer createPollingConsumer() throws Exception;
}

createConsumer() returns an event-driven consumer and createPollingConsumer() returns a polling consumer. You would only implement one these methods. For example, if you are following the event-driven pattern for your consumer, you would implement the createConsumer() method provide a method implementation for createPollingConsumer() that simply raises an exception. With the help of the conversion classes, however, Fuse Mediation Router is able to provide a more useful default implementation.

For example, if you want to implement your consumer according to the event-driven pattern, you implement the endpoint by extending DefaultEndpoint and implementing the createConsumer() method. The implementation of createPollingConsumer() is inherited from DefaultEndpoint, where it is defined as follows:

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}

The EventDrivenPollingConsumer constructor takes a reference to the event-driven consumer, this, effectively wrapping it and converting it into a polling consumer. To implement the conversion, the EventDrivenPollingConsumer instance buffers incoming events and makes them available on demand through the receive(), the receive(long timeout), and the receiveNoWait() methods.

Analogously, if you are implementing your consumer according to the polling pattern, you implement the endpoint by extending DefaultPollingEndpoint and implementing the createPollingConsumer() method. In this case, the implementation of the createConsumer() method is inherited from DefaultPollingEndpoint, and the default implementation returns a DefaultScheduledPollConsumer instance (which converts the polling consumer into an event-driven consumer).

Consumer classes can optionally implement the org.apache.camel.spi.ShutdownAware interface, which interacts with the graceful shutdown mechanism, enabling a consumer to ask for extra time to shut down. This is typically needed for components such as SEDA, which can have pending exchanges stored in an internal queue. Normally, you would want to process all of the exchanges in the queue before shutting down the SEDA consumer.

Example 8.2 shows the definition of the ShutdownAware interface.


The ShutdownAware interface defines the following methods:

deferShutdown

Return true from this method, if you want to delay shutdown of the consumer. The shutdownRunningTask argument is an enum which can take either of the following values:

  • ShutdownRunningTask.CompleteCurrentTaskOnly—finish processing the exchanges that are currently being processed by the consumer's thread pool, but do not attempt to process any more exchanges than that.

  • ShutdownRunningTask.CompleteAllTasks—process all of the pending exchanges. For example, in the case of the SEDA component, the consumer would process all of the exchanges from its incoming queue.

getPendingExchangesSize

Indicates how many exchanges remain to be processed by the consumer. A zero value indicates that processing is finished and the consumer can be shut down.

For an example of how to define the ShutdownAware methods, see Example 8.6.

In an event-driven consumer, processing is driven explicitly by external events. The events are received through an event-listener interface, where the listener interface is specific to the particular event source.

Example 8.3 shows the implementation of the JMXConsumer class, which is taken from the Fuse Mediation Router JMX component implementation. The JMXConsumer class is an example of an event-driven consumer, which is implemented by inheriting from the org.apache.camel.impl.DefaultConsumer class. In the case of the JMXConsumer example, events are represented by calls on the NotificationListener.handleNotification() method, which is a standard way of receiving JMX events. In order to receive these JMX events, it is necessary to implement the NotificationListener interface and override the handleNotification() method, as shown in Example 8.3.

Example 8.3. JMXConsumer Implementation

package org.apache.camel.component.jmx;

import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1

    JMXEndpoint jmxEndpoint;

    public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.jmxEndpoint = endpoint;
    }

    public void handleNotification(Notification notification, Object handback) { 3
        try {
            getProcessor().process(jmxEndpoint.createExchange(notification)); 4
        } catch (Throwable e) {
            handleException(e); 5
        }
    }
}

1

The JMXConsumer pattern follows the usual pattern for event-driven consumers by extending the DefaultConsumer class. Additionally, because this consumer is designed to receive events from JMX (which are represented by JMX notifications), it is necessary to implement the NotificationListener interface.

2

You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, and a reference to the next processor in the chain, processor, as arguments.

3

The handleNotification() method (which is defined in NotificationListener) is automatically invoked by JMX whenever a JMX notification arrives. The body of this method should contain the code that performs the consumer's event processing. Because the handleNotification() call originates from the JMX layer, the consumer's threading model is implicitly controlled by the JMX layer, not by the JMXConsumer class.

Note

The handleNotification() method is specific to the JMX example. When implementing your own event-driven consumer, you must identify an analogous event listener method to implement in your custom consumer.

4

This line of code combines two steps. First, the JMX notification object is converted into an exchange object, which is the generic representation of an event in Fuse Mediation Router. Then the newly created exchange object is passed to the next processor in the route (invoked synchronously).

5

The handleException() method is implemented by the DefaultConsumer base class. By default, it handles exceptions using the org.apache.camel.impl.LoggingExceptionHandler class.

In a scheduled poll consumer, polling events are automatically generated by a timer class, java.util.concurrent.ScheduledExecutorService. To receive the generated polling events, you must implement the ScheduledPollConsumer.poll() method (see Consumer Patterns and Threading).

Example 8.4 shows how to implement a consumer that follows the scheduled poll pattern, which is implemented by extending the ScheduledPollConsumer class.

Example 8.4. ScheduledPollConsumer Implementation

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;

import org.apache.camel.impl.ScheduledPollConsumer;

public class CustomConsumer extends ScheduledPollConsumer { 1
    private final CustomEndpoint endpoint;

    public CustomConsumer(CustomEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    protected void poll() throws Exception { 3
        Exchange exchange = /* Receive exchange object ... */;

        // Example of a synchronous processor. 
        getProcessor().process(exchange); 4
    }

    @Override
    protected void doStart() throws Exception { 5
        // Pre-Start:
        // Place code here to execute just before start of processing.
        super.doStart();
        // Post-Start:
        // Place code here to execute just after start of processing.
    }

    @Override
    protected void doStop() throws Exception { 6
        // Pre-Stop:
        // Place code here to execute just before processing stops.
        super.doStop();
        // Post-Stop:
        // Place code here to execute just after processing stops.
    }
}

1

Implement a scheduled poll consumer class, CustomConsumer, by extending the org.apache.camel.impl.ScheduledPollConsumer class.

2

You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, and a reference to the next processor in the chain, processor, as arguments.

3

Override the poll() method to receive the scheduled polling events. This is where you should put the code that retrieves and processes incoming events (represented by exchange objects).

4

In this example, the event is processed synchronously. If you want to process events asynchronously, you should use a reference to an asynchronous processor instead, by calling getAsyncProcessor(). For details of how to process events asynchronously, see Asynchronous Processing.

5

(Optional) If you want some lines of code to execute as the consumer is starting up, override the doStart() method as shown.

6

(Optional) If you want some lines of code to execute as the consumer is stopping, override the doStop() method as shown.

Example 8.5 outlines how to implement a consumer that follows the polling pattern, which is implemented by extending the PollingConsumerSupport class.

Example 8.5. PollingConsumerSupport Implementation

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.PollingConsumerSupport;

public class CustomConsumer extends PollingConsumerSupport { 1
    private final CustomEndpoint endpoint;

    public CustomConsumer(CustomEndpoint endpoint) { 2
        super(endpoint);
        this.endpoint = endpoint;
    }

    public Exchange receiveNoWait() { 3
        Exchange exchange = /* Obtain an exchange object. */;
        // Further processing ...
        return exchange;
    }

    public Exchange receive() { 4
        // Blocking poll ...
    }

    public Exchange receive(long timeout) { 5
        // Poll with timeout ...
    }

    protected void doStart() throws Exception { 6
        // Code to execute whilst starting up.
    }

    protected void doStop() throws Exception {
        // Code to execute whilst shutting down.
    }
}

1

Implement your polling consumer class, CustomConsumer, by extending the org.apache.camel.impl.PollingConsumerSupport class.

2

You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, as an argument. A polling consumer does not need a reference to a processor instance.

3

The receiveNoWait() method should implement a non-blocking algorithm for retrieving an event (exchange object). If no event is available, it should return null.

4

The receive() method should implement a blocking algorithm for retrieving an event. This method can block indefinitely, if events remain unavailable.

5

The receive(long timeout) method implements an algorithm that can block for as long as the specified timeout (typically specified in units of milliseconds).

6

If you want to insert code that executes while a consumer is starting up or shutting down, implement the doStart() method and the doStop() method, respectively.

If the standard consumer patterns are not suitable for your consumer implementation, you can implement the Consumer interface directly and write the threading code yourself. When writing the threading code, however, it is important that you comply with the standard Fuse Mediation Router threading model, as described in Threading Model in Implementing Enterprise Integration Patterns.

For example, the SEDA component from camel-core implements its own consumer threading, which is consistent with the Fuse Mediation Router threading model. Example 8.6 shows an outline of how the SedaConsumer class implements its threading.

Example 8.6. Custom Threading Implementation

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Consumer for the SEDA component.
 *
 * @version $Revision: 922485 $
 */
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { 1
    private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);

    private SedaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;
    ...
    public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = processor;
    }
    ...

    public void run() { 2
        BlockingQueue<Exchange> queue = endpoint.getQueue();
        // Poll the queue and process exchanges
        ...
    }

    ...
    protected void doStart() throws Exception { 3
        int poolSize = endpoint.getConcurrentConsumers();
        executor = endpoint.getCamelContext().getExecutorServiceStrategy()
            .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 4
        for (int i = 0; i < poolSize; i++) { 5
            executor.execute(this);
        }
        endpoint.onStarted(this);
    }

    protected void doStop() throws Exception { 6
        endpoint.onStopped(this);
        // must shutdown executor on stop to avoid overhead of having them running
        endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); 7
        executor = null;

        if (multicast != null) {
            ServiceHelper.stopServices(multicast);
        }
    }
    ...
    //----------
    // Implementation of ShutdownAware interface

    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        // deny stopping on shutdown as we want seda consumers to run in case some other queues
        // depend on this consumer to run, so it can complete its exchanges
        return true;
    }

    public int getPendingExchangesSize() {
        // number of pending messages on the queue
        return endpoint.getQueue().size();
    }

}

1

The SedaConsumer class is implemented by extending the org.apache.camel.impl.ServiceSupport class and implementing the Consumer, Runnable, and ShutdownAware interfaces.

2

Implement the Runnable.run() method to define what the consumer does while it is running in a thread. In this case, the consumer runs in a loop, polling the queue for new exchanges and then processing the exchanges in the latter part of the queue.

3

The doStart() method is inherited from ServiceSupport. You override this method in order to define what the consumer does when it starts up.

4

Instead of creating threads directly, you should create a thread pool using the ExecutorServiceStrategy object that is registered with the CamelContext. This is important, because it enables Fuse Mediation Router to implement centralized management of threads and support such features as graceful shutdown.

For details, see Threading Model in Implementing Enterprise Integration Patterns.

5

Kick off the threads by calling the ExecutorService.execute() method poolSize times.

6

The doStop() method is inherited from ServiceSupport. You override this method in order to define what the consumer does when it shuts down.

7

Shut down the thread pool, which is represented by the executor instance.

Processing an exchange object in a producer—which usually involves sending a message to a remote destination and waiting for a reply—can potentially block for a significant length of time. If you want to avoid blocking the current thread, you can opt to implement the producer as an asynchronous processor. The asynchronous processing pattern decouples the preceding processor from the producer, so that the process() method returns without delay. See Asynchronous Processing.

When implementing a producer, you can support the asynchronous processing model by implementing the org.apache.camel.AsyncProcessor interface. On its own, this is not enough to ensure that the asynchronous processing model will be used: it is also necessary for the preceding processor in the chain to call the asynchronous version of the process() method. The definition of the AsyncProcessor interface is shown in Example 9.2.


The asynchronous version of the process() method takes an extra argument, callback, of org.apache.camel.AsyncCallback type. The corresponding AsyncCallback interface is defined as shown in Example 9.3.


The caller of AsyncProcessor.process() must provide an implementation of AsyncCallback to receive the notification that processing has finished. The AsyncCallback.done() method takes a boolean argument that indicates whether the processing was performed synchronously or not. Normally, the flag would be false, to indicate asynchronous processing. In some cases, however, it can make sense for the producer not to process asynchronously (in spite of being asked to do so). For example, if the producer knows that the processing of the exchange will complete rapidly, it could optimise the processing by doing it synchronously. In this case, the doneSynchronously flag should be set to true.

Example 9.4 outlines how to implement a synchronous producer. In this case, call to Producer.process() blocks until a reply is received.

Example 9.4. DefaultProducer Implementation

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // Perform other initialization tasks...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }
}

1

Implement a custom synchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class.

2

Implement a constructor that takes a reference to the parent endpoint.

3

The process() method implementation represents the core of the producer code. The implementation of the process() method is entirely dependent on the type of component that you are implementing. In outline, the process() method is normally implemented as follows:

  • If the exchange contains an In message, and if this is consistent with the specified exchange pattern, then send the In message to the designated endpoint.

  • If the exchange pattern anticipates the receipt of an Out message, then wait until the Out message has been received. This typically causes the process() method to block for a significant length of time.

  • When a reply is received, call exchange.setOut() to attach the reply to the exchange object. If the reply contains a fault message, set the fault flag on the Out message using Message.setFault(true).

Example 9.5 outlines how to implement an asynchronous producer. In this case, you must implement both a synchronous process() method and an asynchronous process() method (which takes an additional AsyncCallback argument).

Example 9.5. CollectionProducer Implementation

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer implements AsyncProcessor { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // ...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }

    public boolean process(Exchange exchange, AsyncCallback callback) { 4
        // Process exchange asynchronously.
        CustomProducerTask task = new CustomProducerTask(exchange, callback);
        // Process 'task' in a separate thread...
        // ...
        return false; 5
    }
}

public class CustomProducerTask implements Runnable { 6
    private Exchange exchange;
    private AsyncCallback callback;

    public CustomProducerTask(Exchange exchange, AsyncCallback callback) {
        this.exchange = exchange;
        this.callback = callback;
    }

    public void run() { 7
        // Process exchange.
        // ...
        callback.done(false);
    }
}

1

Implement a custom asynchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class, and implementing the AsyncProcessor interface.

2

Implement a constructor that takes a reference to the parent endpoint.

3

Implement the synchronous process() method.

4

Implement the asynchronous process() method. You can implement the asynchronous method in several ways. The approach shown here is to create a java.lang.Runnable instance, task, that represents the code that runs in a sub-thread. You then use the Java threading API to run the task in a sub-thread (for example, by creating a new thread or by allocating the task to an existing thread pool).

5

Normally, you return false from the asynchronous process() method, to indicate that the exchange was processed asynchronously.

6

The CustomProducerTask class encapsulates the processing code that runs in a sub-thread. This class must store a copy of the Exchange object, exchange, and the AsyncCallback object, callback, as private member variables.

7

The run() method contains the code that sends the In message to the producer endpoint and waits to receive the reply, if any. After receiving the reply (Out message or Fault message) and inserting it into the exchange object, you must call callback.done() to notify the caller that processing is complete.

Example 10.1 shows the definition of the org.apache.camel.Exchange interface.


The Exchange interface defines the following methods:

An instance of org.apache.camel.Message type can represent any kind of message (In or Out). Figure 11.1 shows the inheritance hierarchy for the message type. You do not always need to implement a custom message type for a component. In many cases, the default implementation, DefaultMessage, is adequate.


The Message interface defines the following methods:

Example 11.2 outlines how to implement a message by extending the DefaultMessage class.

Example 11.2. Custom Message Implementation

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultMessage;

public class CustomMessage extends DefaultMessage { 1

    public CustomMessage() { 2
        // Create message with default properties...
    }

    @Override
    public String toString() { 3
        // Return a stringified message...
    }

    @Override
    public CustomMessage newInstance() { 4
        return new CustomMessage( ... );
    }

    @Override
    protected Object createBody() { 5
        // Return message body (lazy creation).
    }

    @Override
    protected void populateInitialHeaders(Map<String, Object> map) { 6
        // Initialize headers from underlying message (lazy creation).
    }

    @Override
    protected void populateInitialAttachments(Map<String, DataHandler> map) { 7
        // Initialize attachments from underlying message (lazy creation).
    }
}

1

Implements a custom message class, CustomMessage, by extending the org.apache.camel.impl.DefaultMessage class.

2

Typically, you need a default constructor that creates a message with default properties.

3

Override the toString() method to customize message stringification.

4

The newInstance() method is called from inside the MessageSupport.copy() method. Customization of the newInstance() method should focus on copying all of the custom properties of the current message instance into the new message instance. The MessageSupport.copy() method copies the generic message properties by calling copyFrom().

5

The createBody() method works in conjunction with the MessageSupport.getBody() method to implement lazy access to the message body. By default, the message body is null. It is only when the application code tries to access the body (by calling getBody()), that the body should be created. The MessageSupport.getBody() automatically calls createBody(), when the message body is accessed for the first time.

6

The populateInitialHeaders() method works in conjunction with the header getter and setter methods to implement lazy access to the message headers. This method parses the message to extract any message headers and inserts them into the hash map, map. The populateInitialHeaders() method is automatically called when a user attempts to access a header (or headers) for the first time (by calling getHeader(), getHeaders(), setHeader(), or setHeaders()).

7

The populateInitialAttachments() method works in conjunction with the attachment getter and setter methods to implement lazy access to the attachments. This method extracts the message attachments and inserts them into the hash map, map. The populateInitialAttachments() method is automatically called when a user attempts to access an attachment (or attachments) for the first time by calling getAttachment(), getAttachments(), getAttachmentNames(), or addAttachment().

A

AsyncCallback, Asynchronous processing
asynchronous producer
implementing, How to implement an asynchronous producer
AsyncProcessor, Asynchronous processing
auto-discovery
configuration, Configuring auto-discovery

E

Endpoint, Endpoint
createConsumer(), Endpoint methods
createExchange(), Endpoint methods
createPollingConsumer(), Endpoint methods
createProducer(), Endpoint methods
getCamelContext(), Endpoint methods
getEndpointURI(), Endpoint methods
interface definition, The Endpoint interface
isLenientProperties(), Endpoint methods
isSingleton(), Endpoint methods
setCamelContext(), Endpoint methods
endpoint
event-driven, Event-driven endpoint implementation
scheduled, Scheduled poll endpoint implementation
endpoints, Endpoint
Exchange, Exchange, The Exchange interface
copy(), Exchange methods
getExchangeId(), Exchange methods
getIn(), Accessing message headers, Exchange methods
getOut(), Exchange methods
getPattern(), Exchange methods
getProperties(), Exchange methods
getProperty(), Exchange methods
getUnitOfWork(), Exchange methods
removeProperty(), Exchange methods
setExchangeId(), Exchange methods
setIn(), Exchange methods
setOut(), Exchange methods
setProperty(), Exchange methods
setUnitOfWork(), Exchange methods
exchange
in capable, Testing the exchange pattern
out capable, Testing the exchange pattern
exchange properties
accessing, Wrapping the exchange accessors
ExchangeHelper, The ExchangeHelper Class
getContentType(), Get the In message's MIME content type
getMandatoryHeader(), Accessing message headers, Wrapping the exchange accessors
getMandatoryInBody(), Wrapping the exchange accessors
getMandatoryOutBody(), Wrapping the exchange accessors
getMandatoryProperty(), Wrapping the exchange accessors
isInCapable(), Testing the exchange pattern
isOutCapable(), Testing the exchange pattern
resolveEndpoint(), Resolve an endpoint
exchanges, Exchange

M

Message, Message
getHeader(), Accessing message headers
message headers
accessing, Accessing message headers
messages, Message

P

pipeline, Pipelining model
Processor, Processor interface
implementing, Implementing the Processor interface
producer, Producer
Producer, Producer
createExchange(), Producer methods
getEndpoint(), Producer methods
process(), Producer methods
producers
asynchronous, Asynchronous producer
synchronous, Synchronous producer

S

ScheduledPollEndpoint, Scheduled poll endpoint implementation
simple processor
implementing, Implementing the Processor interface
synchronous producer
implementing, How to implement a synchronous producer

T

type conversion
runtime process, Type conversion process
type converter
annotating the implementation, Implement an annotated converter class
discovery file, Create a TypeConverter file
implementation steps, How to implement a type converter
mater, Master type converter
packaging, Package the type converter
slave, Master type converter
TypeConverter, Type converter interface
TypeConverterLoader, Type converter loader

U

useIntrospectionOnEndpoint(), Disabling endpoint parameter injection