Version 4.4.1
Copyright © 2011-2013 Red Hat, Inc. and/or its affiliates.
Updated: 06 Jun 2013
Table of Contents
List of Figures
List of Tables
List of Examples
resolveEndpoint() MethodcreateEndpoint()Before you can begin programming with Fuse Mediation Router, you should have a clear understanding of how messages and message exchanges are modelled. Because Fuse Mediation Router can process many message formats, the basic message type is designed to have an abstract format. Fuse Mediation Router provides the APIs needed to access and transform the data formats that underly message bodies and message headers.
An exchange object is a wrapper that encapsulates a received message and stores its associated metadata (including the exchange properties). In addition, if the current message is dispatched to a producer endpoint, the exchange provides a temporary slot to hold the reply (the Out message).
An important feature of exchanges in Fuse Mediation Router is that they support lazy creation of messages. This can provide a significant optimization in the case of routes that do not require explicit access to messages.
Figure 1.1 shows an exchange object passing through
a route. In the context of a route, an exchange object gets passed as the argument of the
Processor.process() method. This means that the exchange object
is directly accessible to the source endpoint, the target endpoint, and all of the
processors in between.
The org.apache.camel.Exchange interface defines methods
to access In and Out messages, as shown in Example 1.1.
Example 1.1. Exchange Methods
// Access the In message Message getIn(); void setIn(Message in); // Access the Out message (if any) Message getOut(); void setOut(Message out); boolean hasOut(); // Access the exchange ID String getExchangeId(); void setExchangeId(String id);
For a complete description of the methods in the Exchange
interface, see The Exchange Interface.
Fuse Mediation Router supports lazy creation of In, Out,
and Fault messages. This means that message instances are not created
until you try to access them (for example, by calling getIn() or
getOut()). The lazy message creation semantics are implemented by
the org.apache.camel.impl.DefaultExchange class.
If you call one of the no-argument accessors (getIn() or
getOut()), or if you call an accessor with the boolean argument
equal to true (that is, getIn(true) or
getOut(true)), the default method implementation creates a new
message instance, if one does not already exist.
If you call an accessor with the boolean argument equal to false
(that is, getIn(false) or getOut(false)),
the default method implementation returns the current message value.[1]
Fuse Mediation Router supports lazy creation of exchange IDs. You can call
getExchangeId() on any exchange to obtain a unique ID for that exchange
instance, but the ID is generated only when you actually call the method. The
DefaultExchange.getExchangeId() implementation of this method delegates ID
generation to the UUID generator that is registered with the
CamelContext.
For details of how to register UUID generators with the CamelContext, see
Built-In UUID Generators.
Message objects represent messages using the following abstract model:
Message body
Message headers
Message attachments
The message body and the message headers can be of arbitrary type (they are declared as
type Object) and the message attachments are declared to be of type
javax.activation.DataHandler , which can contain arbitrary MIME types. If you
need to obtain a concrete representation of the message contents, you can convert the body
and headers to another type using the type converter mechanism and, possibly, using the
marshalling and unmarshalling mechanism.
One important feature of Fuse Mediation Router messages is that they support lazy creation of message bodies and headers. In some cases, this means that a message can pass through a route without needing to be parsed at all.
The org.apache.camel.Message interface defines methods to
access the message body, message headers and message attachments, as shown in Example 1.2.
Example 1.2. Message Interface
// Access the message body Object getBody(); <T> T getBody(Class<T> type); void setBody(Object body); <T> void setBody(Object body, Class<T> type); // Access message headers Object getHeader(String name); <T> T getHeader(String name, Class<T> type); void setHeader(String name, Object value); Object removeHeader(String name); Map<String, Object> getHeaders(); void setHeaders(Map<String, Object> headers); // Access message attachments javax.activation.DataHandler getAttachment(String id); java.util.Map<String, javax.activation.DataHandler> getAttachments(); java.util.Set<String> getAttachmentNames(); void addAttachment(String id, javax.activation.DataHandler content) // Access the message ID String getMessageId(); void setMessageId(String messageId);
For a complete description of the methods in the Message
interface, see The Message Interface.
Fuse Mediation Router supports lazy creation of bodies, headers, and attachments. This means that the objects that represent a message body, a message header, or a message attachment are not created until they are needed.
For example, consider the following route that accesses the foo message
header from the In message:
from("SourceURL")
.filter(header("foo")
.isEqualTo("bar"))
.to("TargetURL");In this route, if we assume that the component referenced by
SourceURL supports lazy creation, the In
message headers are not actually parsed until the header("foo") call is
executed. At that point, the underlying message implementation parses the headers and
populates the header map. The message body is not parsed until you
reach the end of the route, at the to("
call. At that point, the body is converted into the format required for writing it to the
target endpoint, TargetURL")TargetURL.
By waiting until the last possible moment before populating the bodies, headers, and attachments, you can ensure that unnecessary type conversions are avoided. In some cases, you can completely avoid parsing. For example, if a route contains no explicit references to message headers, a message could traverse the route without ever parsing the headers.
Whether or not lazy creation is implemented in practice depends on the underlying component implementation. In general, lazy creation is valuable for those cases where creating a message body, a message header, or a message attachment is expensive. For details about implementing a message type that supports lazy creation, see Implementing the Message Interface.
Fuse Mediation Router supports lazy creation of message IDs. That is, a message ID is generated only
when you actually call the getMessageId() method. The
DefaultExchange.getExchangeId() implementation of this method delegates ID
generation to the UUID generator that is registered with the
CamelContext.
Some endpoint implementations would call the getMessageId() method
implicitly, if the endpoint implements a protocol that requires a unique message ID. In
particular, JMS messages normally include a header containing unique message ID, so the JMS
component automatically calls getMessageId() to obtain the message ID (this is
controlled by the messageIdEnabled option on the JMS endpoint).
For details of how to register UUID generators with the CamelContext, see
Built-In UUID Generators.
The initial format of an In message is determined by the source
endpoint, and the initial format of an Out message is determined by the
target endpoint. If lazy creation is supported by the underlying component, the message
remains unparsed until it is accessed explicitly by the application. Most Fuse Mediation Router
components create the message body in a relatively raw form—for example, representing
it using types such as byte[], ByteBuffer,
InputStream, or OutputStream. This ensures
that the overhead required for creating the initial message is minimal. Where more elaborate
message formats are required components usually rely on type
converters or marshalling processors.
It does not matter what the initial format of the message is, because you can easily
convert a message from one format to another using the built-in type converters (see Built-In Type Converters). There are various methods in the Fuse Mediation Router API that
expose type conversion functionality. For example, the convertBodyTo(Class
type) method can be inserted into a route to convert the body of an
In message, as follows:
from("SourceURL").convertBodyTo(String.class).to("TargetURL");Where the body of the In message is converted to a
java.lang.String. The following example shows how to append a
string to the end of the In message body:
from("SourceURL").setBody(bodyAs(String.class).append("My Special Signature")).to("TargetURL");Where the message body is converted to a string format before appending a string to the end. It is not necessary to convert the message body explicitly in this example. You can also use:
from("SourceURL").setBody(body().append("My Special Signature")).to("TargetURL");Where the append() method automatically converts the message
body to a string before appending its argument.
The org.apache.camel.Message interface exposes some
methods that perform type conversion explicitly:
getBody(Class<T> type)—Returns the message body as
type, T.
getHeader(String name, Class<T> type)—Returns the
named header value as type, T.
For the complete list of supported conversion types, see Built-In Type Converters.
In addition to supporting conversion between simple types (such as byte[],
ByteBuffer, String, and so on), the built-in
type converter also supports conversion to XML formats. For example, you can convert a
message body to the org.w3c.dom.Document type. This conversion is
more expensive than the simple conversions, because it involves parsing the entire message
and then creating a tree of nodes to represent the XML document structure. You can convert
to the following XML document types:
org.w3c.dom.Document
javax.xml.transform.sax.SAXSource
XML type conversions have narrower applicability than the simpler conversions. Because not every message body conforms to an XML structure, you have to remember that this type conversion might fail. On the other hand, there are many scenarios where a router deals exclusively with XML message types.
Marshalling involves converting a high-level format to a low-level format, and unmarshalling involves converting a low-level format to a high-level format. The following two processors are used to perform marshalling or unmarshalling in a route:
marshal()
unmarshal()
For example, to read a serialized Java object from a file and unmarshal it into a Java object, you could use the route definition shown in Example 1.3.
Example 1.3. Unmarshalling a Java Object
from("file://tmp/appfiles/serialized")
.unmarshal()
.serialization()
.<FurtherProcessing>
.to("TargetURL");For details of how to marshal and unmarshal various data formats, see Marshalling and unmarshalling in Implementing Enterprise Integration Patterns.
When an In message reaches the end of a route, the target endpoint
must be able to convert the message body into a format that can be written to the physical
endpoint. The same rule applies to Out messages that arrive back at the
source endpoint. This conversion is usually performed implicitly, using the Fuse Mediation Router type
converter. Typically, this involves converting from a low-level format to another low-level
format, such as converting from a byte[] array to an
InputStream type.
This section describes the conversions supported by the master type converter. These conversions are built into the Fuse Mediation Router core.
Usually, the type converter is called through convenience functions, such as
Message.getBody(Class<T> type) or
Message.getHeader(String name, Class<T> type). It is also
possible to invoke the master type converter directly. For example, if you have an exchange
object, exchange, you could convert a given value to a String as
shown in Example 1.4.
Example 1.4. Converting a Value to a String
org.apache.camel.TypeConverter tc = exchange.getContext().getTypeConverter(); String str_value = tc.convertTo(String.class, value);
Fuse Mediation Router provides built-in type converters that perform conversions to and from the following basic types:
java.io.File
String
byte[] and java.nio.ByteBuffer
java.io.InputStream and java.io.OutputStream
java.io.Reader and java.io.Writer
java.io.BufferedReader and java.io.BufferedWriter
java.io.StringReader
However, not all of these types are inter-convertible. The built-in converter is mainly
focused on providing conversions from the File and String types.
The File type can be converted to any of the preceding types, except
Reader, Writer, and StringReader. The
String type can be converted to File, byte[],
ByteBuffer, InputStream, or StringReader. The
conversion from String to File works by interpreting the string as
a file name. The trio of String, byte[], and
ByteBuffer are completely inter-convertible.
You can explicitly specify which character encoding to use for conversion from
byte[] to String and from String to
byte[] by setting the Exchange.CHARSET_NAME exchange property
in the current exchange. For example, to perform conversions using the UTF-8 character
encoding, call exchange.setProperty("Exchange.CHARSET_NAME", "UTF-8"). The
supported character sets are described in the java.nio.charset.Charset class.
Fuse Mediation Router provides built-in type converters that perform conversions to and from the following collection types:
Object[]
java.util.Set
java.util.List
All permutations of conversions between the preceding collection types are supported.
Fuse Mediation Router provides built-in type converters that perform conversions to and from the following map types:
java.util.Map
java.util.HashMap
java.util.Hashtable
java.util.Properties
The preceding map types can also be converted into a set, of java.util.Set
type, where the set elements are of the MapEntry<K,V> type.
You can perform type conversions to the following Document Object Model (DOM) types:
org.w3c.dom.Document—convertible from byte[],
String, java.io.File, and
java.io.InputStream.
org.w3c.dom.Node
javax.xml.transform.dom.DOMSource—convertible from
String.
javax.xml.transform.Source—convertible from byte[]
and String.
All permutations of conversions between the preceding DOM types are supported.
You can also perform conversions to the javax.xml.transform.sax.SAXSource
type, which supports the SAX event-driven XML parser (see the SAX Web site for details). You can convert to
SAXSource from the following types:
String
InputStream
Source
StreamSource
DOMSource
Fuse Mediation Router also enables you to implement your own custom type converters. For details on how to implement a custom type converter, see Type Converters.
Fuse Mediation Router enables you to register a UUID generator in the CamelContext. This
UUID generator is then used whenever Fuse Mediation Router needs to generate a unique ID—in
particular, the registered UUID generator is called to generate the IDs returned by the
Exchange.getExchangeId() and the Message.getMessageId()
methods.
For example, you might prefer to replace the default UUID generator, if part of your
application does not support IDs with a length of 36 characters (like Websphere MQ). Also,
it can be convenient to generate IDs using a simple counter (see the
SimpleUuidGenerator) for testing purposes.
You can configure Fuse Mediation Router to use one of the following UUID generators, which are provided in the core:
org.apache.camel.impl.ActiveMQUuidGenerator—(Default)
generates the same style of ID as is used by Apache ActiveMQ. This implementation might not be
suitable for all applications, because it uses some JDK APIs that are forbidden in the
context of cloud computing (such as the Google App Engine).
org.apache.camel.impl.SimpleUuidGenerator—implements a simple
counter ID, starting at 1. The underlying implementation uses the
java.util.concurrent.atomic.AtomicLong type, so that it is
thread-safe.
org.apache.camel.impl.JavaUuidGenerator—implements an ID based on
the java.util.UUID type. Because java.util.UUID is
synchronized, this might affect performance on some highly concurrent systems.
To implement a custom UUID generator, implement the
org.apache.camel.spi.UuidGenerator interface, which is shown in Example 1.5. The generateUuid() must be implemented
to return a unique ID string.
Example 1.5. UuidGenerator Interface
// Java
package org.apache.camel.spi;
/**
* Generator to generate UUID strings.
*/
public interface UuidGenerator {
String generateUuid();
}To replace the default UUID generator using Java, call the
setUuidGenerator() method on the current CamelContext object.
For example, you can register a SimpleUuidGenerator instance with the current
CamelContext, as follows:
// Java getContext().setUuidGenerator(new org.apache.camel.impl.SimpleUuidGenerator());
The setUuidGenerator() method should be called during startup,
before any routes are activated.
To replace the default UUID generator using Spring, all you need to do is to create an
instance of a UUID generator using the Spring bean element. When a
camelContext instance is created, it automatically looks up the Spring
registry, searching for a bean that implements
org.apache.camel.spi.UuidGenerator. For example, you can register a
SimpleUuidGenerator instance with the CamelContext as
follows:
<beans ...>
<bean id="simpleUuidGenerator"
class="org.apache.camel.impl.SimpleUuidGenerator" />
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
...
</camelContext>
...
</beans>Fuse Mediation Router allows you to implement a custom processor. You can then insert the custom processor into a route to perform operations on exchange objects as they pass through the route.
The pipelining model describes the way in which processors are arranged in Pipes and Filters in Implementing Enterprise Integration Patterns. Pipelining is the most common way to process a sequence of endpoints (a producer endpoint is just a special type of processor). When the processors are arranged in this way, the exchange's In and Out messages are processed as shown in Figure 2.1.
The processors in the pipeline look like services, where the In message is analogous to a request, and the Out message is analogous to a reply. In fact, in a realistic pipeline, the nodes in the pipeline are often implemented by Web service endpoints, such as the CXF component.
For example, Example 2.1
shows a Java DSL pipeline constructed from a sequence of two processors,
ProcessorA, ProcessorB, and a producer endpoint,
TargetURI.
This section describes how to implement a simple processor that executes message processing logic before delegating the exchange to the next processor in the route.
Simple processors are created by implementing the
org.apache.camel.Processor interface. As shown in Example 2.2, the interface defines a single method,
process(), which processes an exchange object.
Example 2.2. Processor Interface
package org.apache.camel;
public interface Processor {
void process(Exchange exchange) throws Exception;
}To create a simple processor you must implement the
Processor interface and provide the logic for the
process() method. Example 2.3 shows
the outline of a simple processor implementation.
Example 2.3. Simple Processor Implementation
import org.apache.camel.Processor;
public class MyProcessor implements Processor {
public MyProcessor() { }
public void process(Exchange exchange) throws Exception
{
// Insert code that gets executed *before* delegating
// to the next processor in the chain.
...
}
}All of the code in the process() method gets executed
before the exchange object is delegated to the next processor in the
chain.
For examples of how to access the message body and header values inside a simple processor, see Accessing Message Content.
Use the process() DSL command to insert a simple processor into a route.
Create an instance of your custom processor and then pass this instance as an argument to
the process() method, as follows:
org.apache.camel.Processor myProc = new MyProcessor();
from("SourceURL").process(myProc).to("TargetURL");Message headers typically contain the most useful message content from the perspective
of a router, because headers are often intended to be processed in a router service. To
access header data, you must first get the message from the exchange object (for example,
using Exchange.getIn()), and then use the
Message interface to retrieve the individual headers (for
example, using Message.getHeader()).
Example 2.4 shows an example of a custom processor that
accesses the value of a header named Authorization. This example uses the
ExchangeHelper.getMandatoryHeader() method, which eliminates the
need to test for a null header value.
Example 2.4. Accessing an Authorization Header
import org.apache.camel.*;
import org.apache.camel.util.ExchangeHelper;
public class MyProcessor implements Processor {
public void process(Exchange exchange) {
String auth = ExchangeHelper.getMandatoryHeader(
exchange,
"Authorization",
String.class
);
// process the authorization string...
// ...
}
}For full details of the Message interface, see
Messages.
You can also access the message body. For example, to append a string to the end of the In message, you can use the processor shown in Example 2.5.
Example 2.5. Accessing the Message Body
import org.apache.camel.*;
import org.apache.camel.util.ExchangeHelper;
public class MyProcessor implements Processor {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody(in.getBody(String.class) + " World!");
}
}You can access a message's attachments using either the
Message.getAttachment() method or the
Message.getAttachments() method. See Example 1.2 for more details.
The org.apache.camel.util.ExchangeHelper class is a Fuse Mediation Router utility class that
provides methods that are useful when implementing a processor.
The static resolveEndpoint() method is one of the most useful
methods in the ExchangeHelper class. You use it inside a processor to
create new Endpoint instances on the fly.
Example 2.6. The resolveEndpoint() Method
public final class ExchangeHelper {
...
@SuppressWarnings({"unchecked" })
public static Endpoint
resolveEndpoint(Exchange exchange, Object value)
throws NoSuchEndpointException { ... }
...
}The first argument to resolveEndpoint() is an
exchange instance, and the second argument is usually an endpoint URI string. Example 2.7 shows how to create a new file endpoint
from an exchange instance exchange
Example 2.7. Creating a File Endpoint
Endpoint file_endp = ExchangeHelper.resolveEndpoint(exchange, "file://tmp/messages/in.xml");
The ExchangeHelper class provides several static methods of the
form getMandatory, which
wrap the corresponding BeanProperty()get
methods on the BeanProperty()Exchange class. The difference between them is that
the original get accessors
return BeanProperty()null, if the corresponding property is unavailable, and the
getMandatory wrapper
methods throw a Java exception. The following wrapper methods are implemented in the
BeanProperty()ExchangeHelper
class:
public final class ExchangeHelper {
...
public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type)
throws NoSuchPropertyException { ... }
public static <T> T getMandatoryHeader(Exchange exchange, String propertyName, Class<T> type)
throws NoSuchHeaderException { ... }
public static Object getMandatoryInBody(Exchange exchange)
throws InvalidPayloadException { ... }
public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type)
throws InvalidPayloadException { ... }
public static Object getMandatoryOutBody(Exchange exchange)
throws InvalidPayloadException { ... }
public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type)
throws InvalidPayloadException { ... }
...
}Several different exchange patterns are compatible with holding an
In message. Several different exchange patterns are also compatible
with holding an Out message. To provide a quick way of checking whether
or not an exchange object is capable of holding an In message or an
Out message, the ExchangeHelper class provides
the following
methods:
public final class ExchangeHelper {
...
public static boolean isInCapable(Exchange exchange) { ... }
public static boolean isOutCapable(Exchange exchange) { ... }
...
}If you want to find out the MIME content type of the exchange's In
message, you can access it by calling the
ExchangeHelper.getContentType(exchange) method. To implement
this, the ExchangeHelper object looks up the value of the
In message's Content-Type header—this method
relies on the underlying component to populate the header
value).
Fuse Mediation Router has a built-in type conversion mechanism, which is used to convert message bodies and message headers to different types. This chapter explains how to extend the type conversion mechanism by adding your own custom converter methods.
This section describes the overall architecture of the type converter mechanism, which you must understand, if you want to write custom type converters. If you only need to use the built-in type converters, see Understanding Message Formats.
Example 3.1 shows the definition of the
org.apache.camel.TypeConverter interface, which all type
converters must implement.
Example 3.1. TypeConverter Interface
package org.apache.camel;
public interface TypeConverter {
<T> T convertTo(Class<T> type, Object value);
}The Fuse Mediation Router type converter mechanism follows a master/slave pattern. There are many slave type converters, which are each capable of performing a limited number of type conversions, and a single master type converter, which aggregates the type conversions performed by the slaves. The master type converter acts as a front-end for the slave type converters. When you request the master to perform a type conversion, it selects the appropriate slave and delegates the conversion task to that slave.
For users of the type conversion mechanism, the master type converter is
the most important because it provides the entry point for accessing the conversion
mechanism. During start up, Fuse Mediation Router automatically associates a master type converter
instance with the CamelContext object. To obtain a reference to the master type
converter, you call the CamelContext.getTypeConverter() method. For
example, if you have an exchange object, exchange, you can obtain a reference
to the master type converter as shown in Example 3.2.
Example 3.2. Getting a Master Type Converter
org.apache.camel.TypeConverter tc = exchange.getContext().getTypeConverter();
The master type converter uses a type converter loader to
populate the registry of slave type converters. A type converter loader is any class that
implements the TypeConverterLoader interface. Fuse Mediation Router
currently uses only one kind of type converter loader—the annotation type
converter loader (of AnnotationTypeConverterLoader
type).
Figure 3.1 gives an overview of the type conversion process,
showing the steps involved in converting a given data value, value, to a
specified type, toType.
The type conversion mechanism proceeds as follows:
The CamelContext object holds a reference to the master
TypeConverter instance. The first step in the
conversion process is to retrieve the master type converter by calling
CamelContext.getTypeConverter().
Type conversion is initiated by calling the convertTo()
method on the master type converter. This method instructs the type converter to convert
the data object, value, from its original type to the type specified by the
toType argument.
Because the master type converter is a front end for many different slave type
converters, it looks up the appropriate slave type converter by checking a registry of
type mappings The registry of type converters is keyed by a type mapping pair
(. If a suitable type converter is found in
the registry, the master type converter calls the slave's toType,
fromType)convertTo()
method and returns the result.
If a suitable type converter cannot be found in the registry, the master type converter loads a new type converter, using the type converter loader.
The type converter loader searches the available JAR libraries on the classpath to
find a suitable type converter. Currently, the loader strategy that is used is
implemented by the annotation type converter loader, which attempts to load a class
annotated by the org.apache.camel.Converter annotation.
See Create a TypeConverter file.
If the type converter loader is successful, a new slave type converter is loaded and
entered into the type converter registry. This type converter is then used to convert
the value argument to the toType type.
If the data is successfully converted, the converted data value is returned. If the
conversion does not succeed, null is returned.
The type conversion mechanism can easily be customized by adding a new slave type converter. This section describes how to implement a slave type converter and how to integrate it with Fuse Mediation Router, so that it is automatically loaded by the annotation type converter loader.
You can implement a custom type converter class using the @Converter annotation. You must annotate the class itself and each of the
static methods intended to perform type conversion. Each converter method
takes an argument that defines the from type, optionally takes a second
Exchange argument, and has a non-void return value that defines the
to type. The type converter loader uses Java reflection to find the
annotated methods and integrate them into the type converter mechanism. Example 3.3 shows an example of an annotated converter class that
defines a converter method for converting from java.io.File to
java.io.InputStream and another converter method (with an
Exchange argument) for converting from byte[] to
String.
Example 3.3. Example of an Annotated Converter Class
package com.YourDomain.YourPackageName; import org.apache.camel.Converter; import java.io.*; @Converter public class IOConverter { private IOConverter() { } @Converter public static InputStream toInputStream(File file) throws FileNotFoundException { return new BufferedInputStream(new FileInputStream(file)); } @Converter public static String toString(byte[] data, Exchange exchange) { if (exchange != null) { String charsetName = exchange.getProperty(Exchange.CHARSET_NAME, String.class); if (charsetName != null) { try { return new String(data, charsetName); } catch (UnsupportedEncodingException e) { LOG.warn("Can't convert the byte to String with the charset " + charsetName, e); } } } return new String(data); } }
The toInputStream() method is responsible for
performing the conversion from the File type to the InputStream
type and the toString() method is responsible for performing the conversion
from the byte[] type to the String type.
The method name is unimportant, and can be anything you choose. What is important are
the argument type, the return type, and the presence of the @Converter annotation.
To enable the discovery mechanism (which is implemented by the annotation
type converter loader) for your custom converter, create a
TypeConverter file at the following
location:
META-INF/services/org/apache/camel/TypeConverter
The
TypeConverter file must contain a comma-separated list of package
names identifying the packages that contain type converter classes. For example, if you want
the type converter loader to search the
com.YourDomain.YourPackageName
package for annotated converter classes, the TypeConverter file would
have the following
contents:
com.YourDomain.YourPackageName
The type converter is packaged as a JAR file containing the compiled classes of your
custom type converters and the META-INF directory.
Put this JAR file on your classpath to make it available to your Fuse Mediation Router
application.
In addition to defining regular converter methods using the @Converter
annotation, you can optionally define a fallback converter method using the
@FallbackConverter annotation. The fallback converter method will only be
tried, if the master type converter fails to find a regular converter method in the type
registry.
The essential difference between a regular converter method and a fallback converter
method is that whereas a regular converter is defined to perform conversion between a
specific pair of types (for example, from byte[] to String), a
fallback converter can potentially perform conversion between any pair
of types. It is up to the code in the body of the fallback converter method to figure out
which conversions it is able to perform. At run time, if a conversion cannot be performed
by a regular converter, the master type converter iterates through every available fallback
converter until it finds one that can perform the conversion.
The method signature of a fallback converter can have either of the following forms:
// 1. Non-generic form of signature @FallbackConverter public static ObjectMethodName( Class type, Exchange exchange, Object value, TypeConverterRegistry registry ) // 2. Templating form of signature @FallbackConverter public static <T> TMethodName( Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry )
Where MethodName is an arbitrary method name for the fallback
converter.
For example, the following code extract (taken from the implementation of the File
component) shows a fallback converter that can convert the body of a
GenericFile object, exploiting the type converters already available in the
type converter registry:
package org.apache.camel.component.file; import org.apache.camel.Converter; import org.apache.camel.FallbackConverter; import org.apache.camel.Exchange; import org.apache.camel.TypeConverter; import org.apache.camel.spi.TypeConverterRegistry; @Converter public final class GenericFileConverter { private GenericFileConverter() { // Helper Class } @FallbackConverter public static <T> T convertTo(Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry) { // use a fallback type converter so we can convert the embedded body if the value is GenericFile if (GenericFile.class.isAssignableFrom(value.getClass())) { GenericFile file = (GenericFile) value; Class from = file.getBody().getClass(); TypeConverter tc = registry.lookup(type, from); if (tc != null) { Object body = file.getBody(); return tc.convertTo(type, exchange, body); } } return null; } ... }
Generally, the recommended way to implement a type converter is to use an annotated class, as described in the previous section, Implementing Type Converter Using Annotations. But if you want to have complete control over the registration of your type converter, you can implement a custom slave type converter and add it directly to the type converter registry, as described here.
To implement your own type converter class, define a class that implements the
TypeConverter interface. For example, the following
MyOrderTypeConverter class converts an integer value to a
MyOrder object, where the integer value is used to initialize the order ID in
the MyOrder object.
import org.apache.camel.TypeConverter
private class MyOrderTypeConverter implements TypeConverter {
public <T> T convertTo(Class<T> type, Object value) {
// converter from value to the MyOrder bean
MyOrder order = new MyOrder();
order.setId(Integer.parseInt(value.toString()));
return (T) order;
}
public <T> T convertTo(Class<T> type, Exchange exchange, Object value) {
// this method with the Exchange parameter will be preferd by Camel to invoke
// this allows you to fetch information from the exchange during convertions
// such as an encoding parameter or the likes
return convertTo(type, value);
}
public <T> T mandatoryConvertTo(Class<T> type, Object value) {
return convertTo(type, value);
}
public <T> T mandatoryConvertTo(Class<T> type, Exchange exchange, Object value) {
return convertTo(type, value);
}
}You can add the custom type converter directly to the type converter registry using code like the following:
// Add the custom type converter to the type converter registry context.getTypeConverterRegistry().addTypeConverter(MyOrder.class, String.class, new MyOrderTypeConverter());
Where context is the current org.apache.camel.CamelContext
instance. The addTypeConverter() method registers the
MyOrderTypeConverter class against the specific type conversion, from
String.class to MyOrder.class.
The producer and consumer templates in Apache Camel are modelled after a feature of the Spring container API, whereby access to a resource is provided through a simplified, easy-to-use API known as a template. In the case of Apache Camel, the producer template and consumer template provide simplified interfaces for sending messages to and receiving messages from producer endpoints and consumer endpoints.
The producer template supports a variety of different approaches to invoking
producer endpoints. There are methods that support different formats for the
request message (as an Exchange object, as a message body, as a
message body with a single header setting, and so on) and there are methods to
support both the synchronous and the asynchronous style of invocation. Overall,
producer template methods can be grouped into the following categories:
The methods for invoking endpoints synchronously have names of the form
send and
Suffix()request. For example,
the methods for invoking an endpoint using either the default message exchange
pattern (MEP) or an explicitly specified MEP are named Suffix()send(),
sendBody(), and sendBodyAndHeader() (where these
methods respectively send an Exchange object, a message body, or a
message body and header value). If you want to force the MEP to be
InOut (request/reply semantics), you can call the
request(), requestBody(), and
requestBodyAndHeader() methods instead.
The following example shows how to create a ProducerTemplate
instance and use it to send a message body to the activemq:MyQueue
endpoint. The example also shows how to send a message body and header value
using sendBodyAndHeader().
import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();
// Send to a specific queue
template.sendBody("activemq:MyQueue", "<hello>world!</hello>");
// Send with a body and header
template.sendBodyAndHeader(
"activemq:MyQueue",
"<hello>world!</hello>",
"CustomerRating", "Gold" );A special case of synchronous invocation is where you provide the
send() method with a Processor argument instead of
an Exchange argument. In this case, the producer template
implicitly asks the specified endpoint to create an Exchange
instance (typically, but not always having the InOnly MEP
by default). This default exchange is then passed to the processor, which
initializes the contents of the exchange object.
The following example shows how to send an exchange initialized by the
MyProcessor processor to the activemq:MyQueue
endpoint.
import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();
// Send to a specific queue, using a processor to initialize
template.send("activemq:MyQueue", new MyProcessor());The MyProcessor class is implemented as shown in the following
example. In addition to setting the In message body (as
shown here), you could also initialize message heades and exchange
properties.
import org.apache.camel.Processor;
import org.apache.camel.Exchange;
...
public class MyProcessor implements Processor {
public MyProcessor() { }
public void process(Exchange ex) {
ex.getIn().setBody("<hello>world!</hello>");
}
}The methods for invoking endpoints asynchronously have
names of the form asyncSend and
Suffix()asyncRequest. For
example, the methods for invoking an endpoint using either the default message
exchange pattern (MEP) or an explicitly specified MEP are named
Suffix()asyncSend() and asyncSendBody() (where these
methods respectively send an Exchange object or a message body). If
you want to force the MEP to be InOut (request/reply
semantics), you can call the asyncRequestBody(),
asyncRequestBodyAndHeader(), and
asyncRequestBodyAndHeaders() methods instead.
The following example shows how to send an exchange asynchronously to the
direct:start endpoint. The asyncSend() method
returns a java.util.concurrent.Future object, which is used to
retrieve the invocation result at a later time.
import java.util.concurrent.Future;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
...
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Hello");
Future<Exchange> future = template.asyncSend("direct:start", exchange);
// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the resulting exchange from the Future
Exchange result = future.get();The producer template also provides methods to send a message body
asynchronously (for example, using asyncSendBody() or
asyncRequestBody()). In this case, you can use one of the
following helper methods to extract the returned message body from the
Future object:
<T> T extractFutureBody(Future future, Class<T> type); <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
The first version of the extractFutureBody() method blocks until
the invocation completes and the reply message is available. The second version
of the extractFutureBody() method allows you to specify a timeout.
Both methods have a type argument, type, which casts the returned
message body to the specified type using a built-in type converter.
The following example shows how to use the asyncRequestBody()
method to send a message body to the direct:start endpoint. The
blocking extractFutureBody() method is then used to retrieve the
reply message body from the Future object.
Future<Object> future = template.asyncRequestBody("direct:start", "Hello");
// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the reply message body as a String type
String result = template.extractFutureBody(future, String.class);In the preceding asynchronous examples, the request message is dispatched in a
sub-thread, while the reply is retrieved and processed by the main thread. The
producer template also gives you the option, however, of processing replies in
the sub-thread, using one of the asyncCallback(),
asyncCallbackSendBody(), or
asyncCallbackRequestBody() methods. In this case, you supply a
callback object (of org.apache.camel.impl.SynchronizationAdapter
type), which automatically gets invoked in the sub-thread as soon as a reply
message arrives.
The Synchronization callback interface is defined as
follows:
package org.apache.camel.spi;
import org.apache.camel.Exchange;
public interface Synchronization {
void onComplete(Exchange exchange);
void onFailure(Exchange exchange);
}Where the onComplete() method is called on receipt of a normal
reply and the onFailure() method is called on receipt of a fault
message reply. Only one of these methods gets called back, so you must override
both of them to ensure that all types of reply are processed.
The following example shows how to send an exchange to the
direct:start endpoint, where the reply message is processed in
the sub-thread by the SynchronizationAdapter callback
object.
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SynchronizationAdapter;
...
Exchange exchange = context.getEndpoint("direct:start").createExchange();
exchange.getIn().setBody("Hello");
Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() {
@Override
public void onComplete(Exchange exchange) {
assertEquals("Hello World", exchange.getIn().getBody());
}
});Where the SynchronizationAdapter class is a default
implementation of the Synchronization interface, which you can
override to provide your own definitions of the onComplete() and
onFailure() callback methods.
You still have the option of accessing the reply from the main thread, because
the asyncCallback() method also returns a Future
object—for example:
// Retrieve the reply from the main thread, specifying a timeout Exchange reply = future.get(10, TimeUnit.SECONDS);
The synchronous send methods are a collection of methods that you can use to invoke a producer endpoint, where the current thread blocks until the method invocation is complete and the reply (if any) has been received. These methods are compatible with any kind of message exchange protocol.
The basic send() method is a general-purpose method that sends
the contents of an Exchange object to an endpoint, using the
message exchange pattern (MEP) of the exchange. The return value is the exchange
that you get after it has been processed by the producer endpoint (possibly
containing an Out message, depending on the MEP).
There are three varieties of send() method for sending an
exchange that let you specify the target endpoint in one of the following ways:
as the default endpoint, as an endpoint URI, or as an Endpoint
object.
Exchange send(Exchange exchange); Exchange send(String endpointUri, Exchange exchange); Exchange send(Endpoint endpoint, Exchange exchange);
A simple variation of the general send() method is to use a
processor to populate a default exchange, instead of supplying the exchange
object explicitly (see Synchronous invocation with a processor for
details).
The send() methods for sending an exchange populated by a
processor let you specify the target endpoint in one of the following ways: as
the default endpoint, as an endpoint URI, or as an Endpoint object.
In addition, you can optionally specify the exchange's MEP by supplying the
pattern argument, instead of accepting the default.
Exchange send(Processor processor);
Exchange send(String endpointUri, Processor processor);
Exchange send(Endpoint endpoint, Processor processor);
Exchange send(
String endpointUri,
ExchangePattern pattern,
Processor processor
);
Exchange send(
Endpoint endpoint,
ExchangePattern pattern,
Processor processor
);If you are only concerned with the contents of the message body that you want
to send, you can use the sendBody() methods to provide the message
body as an argument and let the producer template take care of inserting the
body into a default exchange object.
The sendBody() methods let you specify the target endpoint in one
of the following ways: as the default endpoint, as an endpoint URI, or as an
Endpoint object. In addition, you can optionally specify the
exchange's MEP by supplying the pattern argument, instead of
accepting the default. The methods without a
pattern argument return void (even though the
invocation might give rise to a reply in some cases); and the methods
with a pattern argument return either the
body of the Out message (if there is one) or the body of
the In message (otherwise).
void sendBody(Object body);
void sendBody(String endpointUri, Object body);
void sendBody(Endpoint endpoint, Object body);
Object sendBody(
String endpointUri,
ExchangePattern pattern,
Object body
);
Object sendBody(
Endpoint endpoint,
ExchangePattern pattern,
Object body
);For testing purposes, it is often interesting to try out the effect of a
single header setting and the
sendBodyAndHeader() methods are useful for this kind of header
testing. You supply the message body and header setting as arguments to
sendBodyAndHeader() and let the producer template take care of
inserting the body and header setting into a default exchange object.
The sendBodyAndHeader() methods let you specify the target
endpoint in one of the following ways: as the default endpoint, as an endpoint
URI, or as an Endpoint object. In addition, you can optionally
specify the exchange's MEP by supplying the pattern argument,
instead of accepting the default. The methods without a
pattern argument return void (even though the
invocation might give rise to a reply in some cases); and the methods
with a pattern argument return either the
body of the Out message (if there is one) or the body of
the In message (otherwise).
void sendBodyAndHeader(
Object body,
String header,
Object headerValue
);
void sendBodyAndHeader(
String endpointUri,
Object body,
String header,
Object headerValue
);
void sendBodyAndHeader(
Endpoint endpoint,
Object body,
String header,
Object headerValue
);
Object sendBodyAndHeader(
String endpointUri,
ExchangePattern pattern,
Object body,
String header,
Object headerValue
);
Object sendBodyAndHeader(
Endpoint endpoint,
ExchangePattern pattern,
Object body,
String header,
Object headerValue
);The sendBodyAndHeaders() methods are similar to the
sendBodyAndHeader() methods, except that instead of supplying
just a single header setting, these methods allow you to specify a complete hash
map of header settings.
void sendBodyAndHeaders(
Object body,
Map<String, Object> headers
);
void sendBodyAndHeaders(
String endpointUri,
Object body,
Map<String, Object> headers
);
void sendBodyAndHeaders(
Endpoint endpoint,
Object body,
Map<String, Object> headers
);
Object sendBodyAndHeaders(
String endpointUri,
ExchangePattern pattern,
Object body,
Map<String, Object> headers
);
Object sendBodyAndHeaders(
Endpoint endpoint,
ExchangePattern pattern,
Object body,
Map<String, Object> headers
);You can try out the effect of setting a single exchange property using the
sendBodyAndProperty() methods. You supply the message body and
property setting as arguments to sendBodyAndProperty() and let the
producer template take care of inserting the body and exchange property into a
default exchange object.
The sendBodyAndProperty() methods let you specify the target
endpoint in one of the following ways: as the default endpoint, as an endpoint
URI, or as an Endpoint object. In addition, you can optionally
specify the exchange's MEP by supplying the pattern argument,
instead of accepting the default. The methods without a
pattern argument return void (even though the
invocation might give rise to a reply in some cases); and the methods
with a pattern argument return either the
body of the Out message (if there is one) or the body of
the In message (otherwise).
void sendBodyAndProperty(
Object body,
String property,
Object propertyValue
);
void sendBodyAndProperty(
String endpointUri,
Object body,
String property,
Object propertyValue
);
void sendBodyAndProperty(
Endpoint endpoint,
Object body,
String property,
Object propertyValue
);
Object sendBodyAndProperty(
String endpoint,
ExchangePattern pattern,
Object body,
String property,
Object propertyValue
);
Object sendBodyAndProperty(
Endpoint endpoint,
ExchangePattern pattern,
Object body,
String property,
Object propertyValue
);The synchronous request methods are similar to the synchronous send methods, except that the request methods force the message exchange pattern to be InOut (conforming to request/reply semantics). Hence, it is generally convenient to use a synchronous request method, if you expect to receive a reply from the producer endpoint.
The basic request() method is a general-purpose method that uses
a processor to populate a default exchange and forces the message exchange
pattern to be InOut (so that the invocation obeys
request/reply semantics). The return value is the exchange that you get after it
has been processed by the producer endpoint, where the Out
message contains the reply message.
The request() methods for sending an exchange populated by a
processor let you specify the target endpoint in one of the following ways: as
an endpoint URI, or as an Endpoint object.
Exchange request(String endpointUri, Processor processor); Exchange request(Endpoint endpoint, Processor processor);
If you are only concerned with the contents of the message body in the request
and in the reply, you can use the requestBody() methods to provide
the request message body as an argument and let the producer template take care
of inserting the body into a default exchange object.
The requestBody() methods let you specify the target endpoint in
one of the following ways: as the default endpoint, as an endpoint URI, or as an
Endpoint object. The return value is the body of the reply
message (Out message body), which can either be returned as
plain Object or converted to a specific type, T, using
the built-in type converters (see Built-In Type Converters).
Object requestBody(Object body);
<T> T requestBody(Object body, Class<T> type);
Object requestBody(
String endpointUri,
Object body
);
<T> T requestBody(
String endpointUri,
Object body,
Class<T> type
);
Object requestBody(
Endpoint endpoint,
Object body
);
<T> T requestBody(
Endpoint endpoint,
Object body,
Class<T> type
);You can try out the effect of setting a single header value using the
requestBodyAndHeader() methods. You supply the message body and
header setting as arguments to requestBodyAndHeader() and let the
producer template take care of inserting the body and exchange property into a
default exchange object.
The requestBodyAndHeader() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object. The return value is the body of the reply
message (Out message body), which can either be returned as
plain Object or converted to a specific type, T, using
the built-in type converters (see Built-In Type Converters).
Object requestBodyAndHeader(
String endpointUri,
Object body,
String header,
Object headerValue
);
<T> T requestBodyAndHeader(
String endpointUri,
Object body,
String header,
Object headerValue,
Class<T> type
);
Object requestBodyAndHeader(
Endpoint endpoint,
Object body,
String header,
Object headerValue
);
<T> T requestBodyAndHeader(
Endpoint endpoint,
Object body,
String header,
Object headerValue,
Class<T> type
);The requestBodyAndHeaders() methods are similar to the
requestBodyAndHeader() methods, except that instead of
supplying just a single header setting, these methods allow you to specify a
complete hash map of header settings.
Object requestBodyAndHeaders(
String endpointUri,
Object body,
Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
String endpointUri,
Object body,
Map<String, Object> headers,
Class<T> type
);
Object requestBodyAndHeaders(
Endpoint endpoint,
Object body,
Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
Endpoint endpoint,
Object body,
Map<String, Object> headers,
Class<T> type
);The producer template provides a variety of methods for invoking a producer endpoint asynchronously, so that the main thread does not block while waiting for the invocation to complete and the reply message can be retrieved at a later time. The asynchronous send methods described in this section are compatible with any kind of message exchange protocol.
The basic asyncSend() method takes an Exchange
argument and invokes an endpoint asynchronously, using the message exchange
pattern (MEP) of the specified exchange. The return value is a
java.util.concurrent.Future object, which is a ticket you can
use to collect the reply message at a later time—for details of how to
obtain the return value from the Future object, see Asynchronous invocation.
The following asyncSend() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object.
Future<Exchange> asyncSend(String endpointUri, Exchange exchange); Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
A simple variation of the general asyncSend() method is to use a
processor to populate a default exchange, instead of supplying the exchange
object explicitly.
The following asyncSend() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object.
Future<Exchange> asyncSend(String endpointUri, Processor processor); Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
If you are only concerned with the contents of the message body that you want
to send, you can use the asyncSendBody() methods to send a message
body asynchronously and let the producer template take care of inserting the
body into a default exchange object.
The asyncSendBody() methods let you specify the target endpoint
in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Object> asyncSendBody(String endpointUri, Object body); Future<Object> asyncSendBody(Endpoint endpoint, Object body);
The asynchronous request methods are similar to the asynchronous send methods, except that the request methods force the message exchange pattern to be InOut (conforming to request/reply semantics). Hence, it is generally convenient to use an asynchronous request method, if you expect to receive a reply from the producer endpoint.
If you are only concerned with the contents of the message body in the request
and in the reply, you can use the requestBody() methods to provide
the request message body as an argument and let the producer template take care
of inserting the body into a default exchange object.
The asyncRequestBody() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object. The return value that is retrievable from the
Future object is the body of the reply message
(Out message body), which can be returned either as a
plain Object or converted to a specific type, T, using
a built-in type converter (see Asynchronous invocation).
Future<Object> asyncRequestBody(
String endpointUri,
Object body
);
<T> Future<T> asyncRequestBody(
String endpointUri,
Object body,
Class<T> type
);
Future<Object> asyncRequestBody(
Endpoint endpoint,
Object body
);
<T> Future<T> asyncRequestBody(
Endpoint endpoint,
Object body,
Class<T> type
);You can try out the effect of setting a single header value using the
asyncRequestBodyAndHeader() methods. You supply the message
body and header setting as arguments to asyncRequestBodyAndHeader()
and let the producer template take care of inserting the body and exchange
property into a default exchange object.
The asyncRequestBodyAndHeader() methods let you specify the
target endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object. The return value that is retrievable from the
Future object is the body of the reply message
(Out message body), which can be returned either as a
plain Object or converted to a specific type, T, using
a built-in type converter (see Asynchronous invocation).
Future<Object> asyncRequestBodyAndHeader(
String endpointUri,
Object body,
String header,
Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
String endpointUri,
Object body,
String header,
Object headerValue,
Class<T> type
);
Future<Object> asyncRequestBodyAndHeader(
Endpoint endpoint,
Object body,
String header,
Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
Endpoint endpoint,
Object body,
String header,
Object headerValue,
Class<T> type
);The asyncRequestBodyAndHeaders() methods are similar to the
asyncRequestBodyAndHeader() methods, except that instead of
supplying just a single header setting, these methods allow you to specify a
complete hash map of header settings.
Future<Object> asyncRequestBodyAndHeaders(
String endpointUri,
Object body,
Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
String endpointUri,
Object body,
Map<String, Object> headers,
Class<T> type
);
Future<Object> asyncRequestBodyAndHeaders(
Endpoint endpoint,
Object body,
Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
Endpoint endpoint,
Object body,
Map<String, Object> headers,
Class<T> type
);The producer template also provides the option of processing the reply message in the same sub-thread that is used to invoke the producer endpoint. In this case, you provide a callback object, which automatically gets invoked in the sub-thread as soon as the reply message is received. In other words, the asynchronous send with callback methods enable you to initiate an invocation in your main thread and then have all of the associated processing—invocation of the producer endpoint, waiting for a reply and processing the reply—occur asynchronously in a sub-thread.
The basic asyncCallback() method takes an Exchange
argument and invokes an endpoint asynchronously, using the message exchange
pattern (MEP) of the specified exchange. This method is similar to the
asyncSend() method for exchanges, except that it takes an
additional org.apache.camel.spi.Synchronization argument, which is
a callback interface with two methods: onComplete() and
onFailure(). For details of how to use the
Synchronization callback, see Asynchronous invocation with a callback.
The following asyncCallback() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object.
Future<Exchange> asyncCallback(
String endpointUri,
Exchange exchange,
Synchronization onCompletion
);
Future<Exchange> asyncCallback(
Endpoint endpoint,
Exchange exchange,
Synchronization onCompletion
);The asyncCallback() method for processors calls a processor to
populate a default exchange and forces the message exchange pattern to be
InOut (so that the invocation obeys request/reply
semantics).
The following asyncCallback() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object.
Future<Exchange> asyncCallback(
String endpointUri,
Processor processor,
Synchronization onCompletion
);
Future<Exchange> asyncCallback(
Endpoint endpoint,
Processor processor,
Synchronization onCompletion
);If you are only concerned with the contents of the message body that you want
to send, you can use the asyncCallbackSendBody() methods to send a
message body asynchronously and let the producer template take care of inserting
the body into a default exchange object.
The asyncCallbackSendBody() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object.
Future<Object> asyncCallbackSendBody(
String endpointUri,
Object body,
Synchronization onCompletion
);
Future<Object> asyncCallbackSendBody(
Endpoint endpoint,
Object body,
Synchronization onCompletion
);If you are only concerned with the contents of the message body in the request
and in the reply, you can use the asyncCallbackRequestBody()
methods to provide the request message body as an argument and let the producer
template take care of inserting the body into a default exchange object.
The asyncCallbackRequestBody() methods let you specify the target
endpoint in one of the following ways: as an endpoint URI, or as an
Endpoint object.
Future<Object> asyncCallbackRequestBody(
String endpointUri,
Object body,
Synchronization onCompletion
);
Future<Object> asyncCallbackRequestBody(
Endpoint endpoint,
Object body,
Synchronization onCompletion
);The consumer template provides methods for polling a consumer endpoint in order to receive incoming messages. You can choose to receive the incoming message either in the form of an exchange object or in the form of a message body (where the message body can be cast to a particular type using a built-in type converter).
You can use a consumer template to poll a consumer endpoint for exchanges using
one of the following polling methods: blocking receive();
receive() with a timeout; or receiveNoWait(), which
returns immediately. Because a consumer endpoint represents a service, it is also
essential to start the service thread by calling start() before you
attempt to poll for exchanges.
The following example shows how to poll an exchange from the seda:foo
consumer endpoint using the blocking receive() method:
import org.apache.camel.ProducerTemplate; import org.apache.camel.ConsumerTemplate; import org.apache.camel.Exchange; ... ProducerTemplate template = context.createProducerTemplate(); ConsumerTemplate consumer = context.createConsumerTemplate(); // Start the consumer service consumer.start(); ... template.sendBody("seda:foo", "Hello"); Exchange out = consumer.receive("seda:foo"); ... // Stop the consumer service consumer.stop();
Where the consumer template instance, consumer, is instantiated using
the CamelContext.createConsumerTemplate() method and the consumer
service thread is started by calling ConsumerTemplate.start().
You can also poll a consumer endpoint for incoming message bodies using one of the
following methods: blocking receiveBody(); receiveBody()
with a timeout; or receiveBodyNoWait(), which returns immediately. As
in the previous example, it is also essential to start the service thread by calling
start() before you attempt to poll for exchanges.
The following example shows how to poll an incoming message body from the
seda:foo consumer endpoint using the blocking
receiveBody() method:
import org.apache.camel.ProducerTemplate; import org.apache.camel.ConsumerTemplate; ... ProducerTemplate template = context.createProducerTemplate(); ConsumerTemplate consumer = context.createConsumerTemplate(); // Start the consumer service consumer.start(); ... template.sendBody("seda:foo", "Hello"); Object body = consumer.receiveBody("seda:foo"); ... // Stop the consumer service consumer.stop();
There are three basic methods for polling exchanges from a
consumer endpoint: receive() without a timeout blocks indefinitely;
receive() with a timeout blocks for the specified period of
milliseconds; and receiveNoWait() is non-blocking. You can specify the
consumer endpoint either as an endpoint URI or as an Endpoint
instance.
Exchange receive(String endpointUri); Exchange receive(String endpointUri, long timeout); Exchange receiveNoWait(String endpointUri); Exchange receive(Endpoint endpoint); Exchange receive(Endpoint endpoint, long timeout); Exchange receiveNoWait(Endpoint endpoint);
There are three basic methods for polling message bodies from
a consumer endpoint: receiveBody() without a timeout blocks
indefinitely; receiveBody() with a timeout blocks for the specified
period of milliseconds; and receiveBodyNoWait() is non-blocking. You
can specify the consumer endpoint either as an endpoint URI or as an
Endpoint instance. Moreover, by calling the templating forms of
these methods, you can convert the returned body to a particular type,
T, using a built-in type converter.
Object receiveBody(String endpointUri); Object receiveBody(String endpointUri, long timeout); Object receiveBodyNoWait(String endpointUri); Object receiveBody(Endpoint endpoint); Object receiveBody(Endpoint endpoint, long timeout); Object receiveBodyNoWait(Endpoint endpoint); <T> T receiveBody(String endpointUri, Class<T> type); <T> T receiveBody(String endpointUri, long timeout, Class<T> type); <T> T receiveBodyNoWait(String endpointUri, Class<T> type); <T> T receiveBody(Endpoint endpoint, Class<T> type); <T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type); <T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type);
This chapter provides a general overview of the approaches can be used to implement a Fuse Mediation Router component.
A Fuse Mediation Router component consists of a set of classes that are related to each other
through a factory pattern. The primary entry point to a component is the
Component object itself (an instance of
org.apache.camel.Component type). You can use the Component
object as a factory to create Endpoint objects, which in turn act as
factories for creating Consumer, Producer, and
Exchange objects. These relationships are summarized in Figure 5.1
A component implementation is an endpoint factory. The main task of a component
implementor is to implement the Component.createEndpoint()
method, which is responsible for creating new endpoints on demand.
Each kind
of component must be associated with a component prefix that
appears in an endpoint URI. For example, the file component is usually associated with the
file prefix, which can be used in an endpoint URI like
file://tmp/messages/input. When you install a new component in Fuse Mediation Router, you
must define the association between a particular component prefix and the name of the
class that implements the component.
Each endpoint instance encapsulates a particular endpoint URI. Every time Fuse Mediation Router encounters a new endpoint URI, it creates a new endpoint instance. An endpoint object is also a factory for creating consumer endpoints and producer endpoints.
Endpoints must implement the
org.apache.camel.Endpoint interface. The
Endpoint interface defines the following factory
methods:
createConsumer() and
createPollingConsumer()—Creates a consumer endpoint,
which represents the source endpoint at the beginning of a route.
createProducer()—Creates a producer endpoint, which
represents the target endpoint at the end of a route.
createExchange()—Creates an exchange object, which
encapsulates the messages passed up and down the route.
Consumer endpoints consume requests. They always appear at the start of a route and they encapsulate the code responsible for receiving incoming requests and dispatching outgoing replies. From a service-oriented prospective a consumer represents a service.
Consumers must implement the
org.apache.camel.Consumer interface. There are a number
of different patterns you can follow when implementing a consumer. These patterns are
described in Consumer Patterns and Threading.
Producer endpoints produce requests. They always appears at the end of a route and they encapsulate the code responsible for dispatching outgoing requests and receiving incoming replies. From a service-oriented prospective a producer represents a service consumer.
Producers must implement the
org.apache.camel.Producer interface. You can optionally
implement the producer to support an asynchronous style of processing. See Asynchronous Processing for details.
Exchange objects encapsulate a related set of messages. For example, one kind of message exchange is a synchronous invocation, which consists of a request message and its related reply.
Exchanges must implement the
org.apache.camel.Exchange interface. The default
implementation, DefaultExchange, is sufficient for many component
implementations. However, if you want to associated extra data with the exchanges or have
the exchanges preform additional processing, it can be useful to customize the exchange
implementation.
There are two different message slots in an Exchange object:
In message—holds the current message.
Out message—temporarily holds a reply message.
All of the message types are represented by the same Java object,
org.apache.camel.Message. It is not always necessary to customize the
message implementation—the default implementation,
DefaultMessage, is usually adequate.
A Fuse Mediation Router route is essentially a pipeline of processors, of
org.apache.camel.Processor type. Messages are
encapsulated in an exchange object, E, which gets passed from node to node by
invoking the process() method. The architecture of the processor pipeline is
illustrated in Figure 5.2.
At the start of the route, you have the source endpoint, which is represented by an
org.apache.camel.Consumer object. The source endpoint is responsible for
accepting incoming request messages and dispatching replies. When constructing the route,
Fuse Mediation Router creates the appropriate Consumer type based on the component prefix
from the endpoint URI, as described in Factory Patterns for a Component.
Each intermediate node in the pipeline is represented by a processor object
(implementing the org.apache.camel.Processor interface).
You can insert either standard processors (for example, filter,
throttler, or delayer) or insert your own custom processor
implementations.
At the end of the route is the target endpoint, which is represented by an
org.apache.camel.Producer object. Because it comes at the end of a
processor pipeline, the producer is also a processor object (implementing the
org.apache.camel.Processor interface). The target
endpoint is responsible for sending outgoing request messages and receiving incoming
replies. When constructing the route, Fuse Mediation Router creates the appropriate
Producer type based on the component prefix from the endpoint URI.
The pattern used to implement the consumer determines the threading model used in processing the incoming exchanges. Consumers can be implemented using one of the following patterns:
Event-driven pattern—The consumer is driven by an external thread.
Scheduled poll pattern—The consumer is driven by a dedicated thread pool.
Polling pattern—The threading model is left undefined.
In the event-driven pattern, the processing of an incoming request is initiated when
another part of the application (typically a third-party library) calls a method
implemented by the consumer. A good example of an event-driven consumer is the Fuse Mediation Router
JMX component, where events are initiated by the JMX library. The JMX library calls the
handleNotification() method to initiate request
processing—see Example 8.3 for
details.
Figure 5.3 shows an
outline of the event-driven consumer pattern. In this example, it is assumed that
processing is triggered by a call to the
method.notify()
The event-driven consumer processes incoming requests as follows:
The consumer must implement a method to receive the incoming event (in Figure 5.3 this is represented by the
method). The thread
that calls notify() is normally a
separate part of the application, so the consumer's threading policy is externally
driven.notify()
For example, in the case of the JMX consumer implementation, the consumer
implements the NotificationListener.handleNotification()
method to receive notifications from JMX. The threads that drive the consumer
processing are created within the JMX layer.
In the body of the
method, the consumer first converts the incoming event into an exchange object,
notify()E, and then calls process() on the next
processor in the route, passing the exchange object as its argument.
In the scheduled poll pattern, the consumer retrieves incoming requests by checking at regular time intervals whether or not a request has arrived. Checking for requests is scheduled automatically by a built-in timer class, the scheduled executor service, which is a standard pattern provided by the java.util.concurrent library. The scheduled executor service executes a particular task at timed intervals and it also manages a pool of threads, which are used to run the task instances.
Figure 5.4 shows an outline of the scheduled poll consumer pattern.
The scheduled poll consumer processes incoming requests as follows:
The scheduled executor service has a pool of threads at its disposal, that can be
used to initiate consumer processing. After each scheduled time interval has elapsed,
the scheduled executor service attempts to grab a free thread from its pool (there are
five threads in the pool by default). If a free thread is available, it uses that
thread to call the poll() method on the consumer.
The consumer's poll() method is intended to trigger
processing of an incoming request. In the body of the poll()
method, the consumer attempts to retrieve an incoming message. If no request is
available, the poll() method returns immediately.
If a request message is available, the consumer inserts it into an exchange object
and then calls process() on the next processor in the route,
passing the exchange object as its argument.
In the polling pattern, processing of an incoming request is initiated when a third-party calls one of the consumer's polling methods:
receive()
receiveNoWait()
receive(long timeout)
It is up to the component implementation to define the precise mechanism for initiating calls on the polling methods. This mechanism is not specified by the polling pattern.
Figure 5.5 shows an outline of the polling consumer pattern.
The polling consumer processes incoming requests as follows:
Processing of an incoming request is initiated whenever one of the consumer's polling methods is called. The mechanism for calling these polling methods is implementation defined.
In the body of the receive() method, the consumer
attempts to retrieve an incoming request message. If no message is currently
available, the behavior depends on which receive method was called.
receiveNoWait() returns immediately
receive(long timeout) waits for the specified timeout interval[2] before returning
receive() waits until a message is received
If a request message is available, the consumer inserts it into an exchange object
and then calls process() on the next processor in the route,
passing the exchange object as its argument.
Producer endpoints normally follow a synchronous pattern when
processing an exchange. When the preceding processor in a pipeline calls
process() on a producer, the process()
method blocks until a reply is received. In this case, the processor's thread remains
blocked until the producer has completed the cycle of sending the request and receiving
the reply.
Sometimes, however, you might prefer to decouple the preceding processor from the
producer, so that the processor's thread is released immediately and the
process() call does not block. In this
case, you should implement the producer using an asynchronous
pattern, which gives the preceding processor the option of invoking a non-blocking version
of the process() method.
To give you an overview of the different implementation options, this section describes both the synchronous and the asynchronous patterns for implementing a producer endpoint.
Figure 5.6 shows an outline of a synchronous producer, where the preceding processor blocks until the producer has finished processing the exchange.
The synchronous producer processes an exchange as follows:
The preceding processor in the pipeline calls the synchronous
process() method on the producer to initiate synchronous
processing. The synchronous process() method takes a single
exchange argument.
In the body of the process() method, the producer sends
the request (In message) to the endpoint.
If required by the exchange pattern, the producer waits for the reply
(Out message) to arrive from the endpoint. This step can cause
the process() method to block indefinitely. However, if the
exchange pattern does not mandate a reply, the process()
method can return immediately after sending the request.
When the process() method returns, the exchange object
contains the reply from the synchronous call (an Out message
message).
Figure 5.7 shows an outline of an asynchronous producer, where the producer processes the exchange in a sub-thread, and the preceding processor is not blocked for any significant length of time.
The synchronous producer processes an exchange as follows:
Before the processor can call the asynchronous process()
method, it must create an asynchronous callback object, which
is responsible for processing the exchange on the return portion of the route. For the
asynchronous callback, the processor must implement a class that inherits from the
AsyncCallback interface.
The processor calls the asynchronous process() method on
the producer to initiate asynchronous processing. The asynchronous
process() method takes two arguments:
an exchange object
a synchronous callback object
In the body of the process() method, the producer creates
a Runnable object that encapsulates the processing code. The producer
then delegates the execution of this Runnable object to a
sub-thread.
The asynchronous process() method returns, thereby
freeing up the processor's thread. The exchange processing continues in a separate
sub-thread.
The Runnable object sends the In message to the
endpoint.
If required by the exchange pattern, the Runnable object waits for
the reply (Out or Fault message) to arrive
from the endpoint. The Runnable object remains blocked until the reply is
received.
After the reply arrives, the Runnable object inserts the reply
(Out message) into the exchange object and then calls
done() on the asynchronous callback object. The
asynchronous callback is then responsible for processing the reply message (executed
in the sub-thread).
This section gives a brief overview of the steps required to implement a custom Fuse Mediation Router component.
When implementing a component, it is usually necessary to implement the following Java interfaces:
org.apache.camel.Component
org.apache.camel.Endpoint
org.apache.camel.Consumer
org.apache.camel.Producer
In addition, it can also be necessary to implement the following Java interfaces:
org.apache.camel.Exchange
org.apache.camel.Message
You typically implement a custom component as follows:
Implement the Component
interface—A component object acts as an endpoint factory. You extend
the DefaultComponent class and implement the
createEndpoint() method.
See Component Interface.
Implement the Endpoint
interface—An endpoint represents a resource identified by a specific
URI. The approach taken when implementing an endpoint depends on whether the consumers
follow an event-driven pattern, a scheduled
poll pattern, or a polling pattern.
For an event-driven pattern, implement the endpoint by extending the
DefaultEndpoint class and implementing the following
methods:
createProducer()
createConsumer()
For a scheduled poll pattern, implement the endpoint by extending the
ScheduledPollEndpoint class and implementing the following
methods:
createProducer()
createConsumer()
For a polling pattern, implement the endpoint by extending the
DefaultPollingEndpoint class and implementing the following
methods:
createProducer()
createPollConsumer()
See Endpoint Interface.
Implement the Consumer
interface—There are several different approaches you can take to
implementing a consumer, depending on which pattern you need to implement (event-driven,
scheduled poll, or polling). The consumer implementation is also crucially important for
determining the threading model used for processing a message exchange.
Implement the Producer
interface—To implement a producer, you extend the
DefaultProducer class and implement the
process() method.
See Producer Interface.
Optionally implement the Exchange or the Message
interface—The default implementations of
Exchange and Message can
be used directly, but occasionally, you might find it necessary to customize these
types.
See Exchange Interface and Message Interface.
You can install a custom component in one of the following ways:
Add the component directly to the CamelContext—The
CamelContext.addComponent() method adds a component
programatically.
Add the component using Spring configuration—The standard
Spring bean element creates a component instance. The bean's
id attribute implicitly defines the component prefix. For
details, see Configuring a Component.
Configure Fuse Mediation Router to auto-discover the component—Auto-discovery, ensures that Fuse Mediation Router automatically loads the component on demand. For details, see Setting Up Auto-Discovery.
Auto-discovery is a mechanism that enables you to dynamically add components to your
Fuse Mediation Router application. The component URI prefix is used as a key to load components on
demand. For example, if Fuse Mediation Router encounters the endpoint URI,
activemq://MyQName, and the ActiveMQ endpoint is not yet loaded, Fuse Mediation Router
searches for the component identified by the activemq prefix and dynamically
loads the component.
Before configuring auto-discovery, you must ensure that your custom component classes are accessible from your current classpath. Typically, you bundle the custom component classes into a JAR file, and add the JAR file to your classpath.
To enable auto-discovery of your component, create a Java properties file named after
the component prefix, component-prefix, and store that file in
the following
location:
/META-INF/services/org/apache/camel/component/component-prefixThe
component-prefix properties file must contain the following
property
setting:
class=component-class-nameWhere
component-class-name is the fully-qualified name of your
custom component class. You can also define additional system property settings in this
file.
For example, you can enable auto-discovery for the Fuse Mediation Router FTP component by creating the following Java properties file:
/META-INF/services/org/apache/camel/component/ftp
Which contains the following Java property setting:
class=org.apache.camel.component.file.remote.RemoteFileComponent
The Java properties file for the FTP component is already defined in the JAR file,
camel-ftp-Version.jar.
You can add a component by configuring it in the Fuse Mediation Router Spring configuration file,
META-INF/spring/camel-context.xml. To find the component, the component's
URI prefix is matched against the ID attribute of a bean element in the
Spring configuration. If the component prefix matches a bean element ID, Fuse Mediation Router
instantiates the referenced class and injects the properties specified in the Spring
configuration.
This mechanism has priority over auto-discovery. If the CamelContext finds a Spring bean with the requisite ID, it will not attempt to find the component using auto-discovery.
If there are any properties that you want to inject into your component class, define them as bean properties. For example:
public classCustomComponentextends DefaultComponent<CustomExchange> { ...PropTypegetProperty() { ... } void setProperty(PropTypev) { ... } }
The
get method and the
Property()set method access the
value of Property()property.
To configure a component in Spring, edit the configuration file,
META-INF/spring/camel-context.xml, as shown in Example 5.1.
Example 5.1. Configuring a Component in Spring
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<package>RouteBuilderPackage</package>
</camelContext>
<bean id="component-prefix" class="component-class-name">
<property name="property" value="propertyValue"/>
</bean>
</beans>The bean element with ID
component-prefix configures the
component-class-name component. You can inject properties
into the component instance using property elements. For
example, the property element in the preceding example would
inject the value, propertyValue, into the
property property by calling
set on the
component.Property()
Example 5.2 shows an example of how to configure
the Fuse Mediation Router's JMS component by defining a bean element with ID equal to jms.
These settings are added to the Spring configuration file,
camel-context.xml.
Example 5.2. JMS Component Spring Configuration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<package>org.apache.camel.example.spring</package>
</camelContext>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
</bean>
</property>
</bean>
</beans>The | |
The bean element with ID, | |
JMS is just a wrapper for a messaging service. You must specify the concrete
implementation of the messaging system by setting the | |
In this example, the concrete implementation of the JMS messaging service is
Apache ActiveMQ. The |
This chapter describes how to implement the Component
interface.
To implement a Fuse Mediation Router component, you must implement the
org.apache.camel.Component interface. An instance of
Component type provides the entry point into a custom component. That is, all
of the other objects in a component are ultimately accessible through the
Component instance. Figure 6.1 shows
the relevant Java interfaces and classes that make up the Component inheritance
hierarchy.
Example 6.1 shows the definition of the
org.apache.camel.Component interface.
Example 6.1. Component Interface
package org.apache.camel;
public interface Component {
CamelContext getCamelContext();
void setCamelContext(CamelContext context);
Endpoint createEndpoint(String uri) throws Exception;
}The Component interface defines the following
methods:
getCamelContext() and
setCamelContext()—References the
CamelContext to which this Component
belongs. The setCamelContext() method is automatically called
when you add the component to a CamelContext.
createEndpoint()—The factory method that gets called
to create Endpoint instances for this component. The
uri parameter is the endpoint URI, which contains the details
required to create the endpoint.
You implement a new component by extending the
org.apache.camel.impl.DefaultComponent class, which provides some
standard functionality and default implementations for some of the methods. In particular,
the DefaultComponent class provides support for URI parsing and for
creating a scheduled executor (which is used for the scheduled poll
pattern).
The createEndpoint(String uri) method defined in the base
Component interface takes a complete, unparsed endpoint URI
as its sole argument. The DefaultComponent class, on the other hand,
defines a three-argument version of the createEndpoint() method
with the following
signature:
protected abstract Endpoint createEndpoint(
String uri,
String remaining,
Map parameters
)
throws Exception;uri
is the original, unparsed URI; remaining is the part of the URI that remains
after stripping off the component prefix at the start and cutting off the query options at
the end; and parameters contains the parsed query options. It is this version
of the createEndpoint() method that you must override when
inheriting from DefaultComponent. This has the advantage that the
endpoint URI is already parsed for you.
The following sample endpoint URI for
the file component shows how URI parsing works in
practice:
file:///tmp/messages/foo?delete=true&moveNamePostfix=.old
For
this URI, the following arguments are passed to the three-argument version of
createEndpoint():
| Argument | Sample Value |
|---|---|
uri | file:///tmp/messages/foo?delete=true&moveNamePostfix=.old |
remaining | /tmp/messages/foo |
parameters |
Two entries are set in
|
By default, the parameters extracted from the URI query options are injected on the
endpoint's bean properties. The DefaultComponent class automatically
injects the parameters for you.
For example, if you want to define a custom
endpoint that supports two URI query options: delete and
moveNamePostfix. All you must do is define the corresponding bean methods
(getters and setters) in the endpoint
class:
public class FileEndpoint extends ScheduledPollEndpoint {
...
public boolean isDelete() {
return delete;
}
public void setDelete(boolean delete) {
this.delete = delete;
}
...
public String getMoveNamePostfix() {
return moveNamePostfix;
}
public void setMoveNamePostfix(String moveNamePostfix) {
this.moveNamePostfix = moveNamePostfix;
}
}It is also possible to inject URI query options into consumer parameters. For details, see Consumer parameter injection.
If there are no parameters defined on your Endpoint class, you can optimize
the process of endpoint creation by disabling endpoint parameter injection. To disable
parameter injection on endpoints, override the
useIntrospectionOnEndpoint() method and implement it to return
false, as follows:
protected boolean useIntrospectionOnEndpoint() {
return false;
}The useIntrospectionOnEndpoint() method does
not affect the parameter injection that might be performed on a
Consumer class. Parameter injection at that level is controlled by the
Endpoint.configureProperties() method (see Implementing the Endpoint Interface).
The scheduled executor is used in the scheduled poll pattern, where it is responsible for driving the periodic polling of a consumer endpoint (a scheduled executor is effectively a thread pool implementation).
To instantiate a scheduled executor service, use the
ExecutorServiceStrategy object that is returned by the
CamelContext.getExecutorServiceStrategy() method. For details of the Fuse Mediation Router
threading model, see Threading Model in Implementing Enterprise Integration Patterns.
Prior to Fuse Mediation Router 2.3, the DefaultComponent class provided a
getExecutorService() method for creating thread pool instances. Since 2.3,
however, the creation of thread pools is now managed centrally by the
ExecutorServiceStrategy object.
If you want to validate the URI before creating an endpoint instance, you can override
the validateURI() method from the
DefaultComponent class, which has the following signature:
protected void validateURI(String uri,
String path,
Map parameters)
throws ResolveEndpointFailedException;If the supplied URI does not have the required format, the implementation of
validateURI() should throw the
org.apache.camel.ResolveEndpointFailedException
exception.
Example 6.2 outlines how to implement the
DefaultComponent.createEndpoint() method, which is responsible
for creating endpoint instances on demand.
The | |
When extending | |
Create an instance of your custom endpoint type,
|
Example 6.3 shows a sample implementation of a
FileComponent class.
Example 6.3. FileComponent Implementation
package org.apache.camel.component.file;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
import java.io.File;
import java.util.Map;
public class FileComponent extends DefaultComponent {
public static final String HEADER_FILE_NAME = "org.apache.camel.file.name";
public FileComponent() {
}
public FileComponent(CamelContext context) {
super(context);
}
protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
File file = new File(remaining);
FileEndpoint result = new FileEndpoint(file, uri, this);
return result;
}
}Always define a no-argument constructor for the component class in order to facilitate automatic instantiation of the class. | |
A constructor that takes the parent | |
The implementation of the |
This chapter describes how to implement the Endpoint
interface, which is an essential step in the implementation of a Fuse Mediation Router component.
An instance of org.apache.camel.Endpoint type
encapsulates an endpoint URI, and it also serves as a factory for Consumer,
Producer, and Exchange objects. There are three different
approaches to implementing an endpoint:
Event-driven
scheduled poll
polling
These endpoint implementation patterns complement the corresponding patterns for implementing a consumer—see Implementing the Consumer Interface.
Figure 7.1 shows the relevant Java interfaces and
classes that make up the Endpoint inheritance hierarchy.
Example 7.1 shows the definition of the
org.apache.camel.Endpoint interface.
Example 7.1. Endpoint Interface
package org.apache.camel;
public interface Endpoint {
boolean isSingleton();
String getEndpointUri();
String getEndpointKey();
CamelContext getCamelContext();
void setCamelContext(CamelContext context);
void configureProperties(Map options);
boolean isLenientProperties();
Exchange createExchange();
Exchange createExchange(ExchangePattern pattern);
Exchange createExchange(Exchange exchange);
Producer createProducer() throws Exception;
Consumer createConsumer(Processor processor) throws Exception;
PollingConsumer createPollingConsumer() throws Exception;
}The Endpoint interface defines the following
methods:
isSingleton()—Returns true, if you want
to ensure that each URI maps to a single endpoint within a CamelContext. When this
property is true, multiple references to the identical URI within your
routes always refer to a single endpoint instance. When this
property is false, on the other hand, multiple references to the same URI
within your routes refer to distinct endpoint instances. Each time
you refer to the URI in a route, a new endpoint instance is created.
getEndpointUri()—Returns the endpoint URI of this
endpoint.
getEndpointKey()—Used by
org.apache.camel.spi.LifecycleStrategy when registering the
endpoint.
getCamelContext()—return a reference to the
CamelContext instance to which this endpoint belongs.
setCamelContext()—Sets the CamelContext
instance to which this endpoint belongs.
configureProperties()—Stores a copy of the parameter
map that is used to inject parameters when creating a new Consumer
instance.
isLenientProperties()—Returns true to
indicate that the URI is allowed to contain unknown parameters (that is, parameters that
cannot be injected on the Endpoint or the
Consumer class). Normally, this method should be implemented to return
false.
createExchange()—An overloaded method with the
following variants:
Exchange createExchange()—Creates a new exchange instance
with a default exchange pattern setting.
Exchange createExchange(ExchangePattern pattern)—Creates a
new exchange instance with the specified exchange pattern.
Exchange createExchange(Exchange exchange)—Converts the given
exchange argument to the type of exchange needed for this endpoint.
If the given exchange is not already of the correct type, this method copies it into
a new instance of the correct type. A default implementation of this method is
provided in the DefaultEndpoint class.
createProducer()—Factory method used to create new
Producer instances.
createConsumer()—Factory method to create new
event-driven consumer instances. The processor argument is a reference to
the first processor in the route.
createPollingConsumer()—Factory method to create new
polling consumer instances.
In order to avoid unnecessary overhead, it is a good idea to create a
single endpoint instance for all endpoints that have the same URI
(within a CamelContext). You can enforce this condition by implementing
isSingleton() to return true.
In this context, same URI means that two URIs are the same when compared using string equality. In principle, it is possible to have two URIs that are equivalent, though represented by different strings. In that case, the URIs would not be treated as the same.
The following alternative endpoint implementation patterns are supported:
If your custom endpoint conforms to the event-driven pattern (see Consumer Patterns and Threading), it is implemented by extending the
abstract class, org.apache.camel.impl.DefaultEndpoint, as shown in
Example 7.2.
Example 7.2. Implementing DefaultEndpoint
import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; public classCustomEndpointextends DefaultEndpoint {public
CustomEndpoint(String endpointUri, Component component) {super(endpointUri, component); // Do any other initialization... } public Producer createProducer() throws Exception {
return new
CustomProducer(this); } public Consumer createConsumer(Processor processor) throws Exception {return new
CustomConsumer(this, processor); } public boolean isSingleton() { return true; } // Implement the following methods, only if you need to set exchange properties. // public Exchange createExchange() {return this.createExchange(getExchangePattern()); } public Exchange createExchange(ExchangePattern pattern) { Exchange result = new DefaultExchange(getCamelContext(), pattern); // Set exchange properties ... return result; } }
Implement an event-driven custom endpoint,
| |
You must have at least one constructor that takes the endpoint URI,
| |
Implement the | |
Implement the ImportantDo not override the
| |
In general, it is not necessary to override the
|
The DefaultEndpoint class provides default
implementations of the following methods, which you might find useful when writing your
custom endpoint code:
getEndpointUri()—Returns the endpoint
URI.
getCamelContext()—Returns a reference to the
CamelContext.
getComponent()—Returns a reference to the parent
component.
createPollingConsumer()—Creates a polling consumer.
The created polling consumer's functionality is based on the event-driven consumer. If
you override the event-driven consumer method,
createConsumer(), you get a polling consumer implementation for
free.
createExchange(Exchange e)—Converts the given
exchange object, e, to the type required for this endpoint. This method
creates a new endpoint using the overridden createExchange()
endpoints. This ensures that the method also works for custom exchange
types.
From Apache Camel 2.0 onwards, the camel-core module has been refactored
so that the DefaultExchange type can be used with
any component. It is therefore no longer necessary to define a
custom exchange type.
If your custom endpoint conforms to the scheduled poll pattern (see Consumer Patterns and Threading) it is implemented by inheriting
from the abstract class, org.apache.camel.impl.ScheduledPollEndpoint,
as shown in Example 7.3.
Example 7.3. ScheduledPollEndpoint Implementation
import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.impl.ScheduledPollEndpoint; public classCustomEndpointextends ScheduledPollEndpoint {protected
CustomEndpoint(String endpointUri,CustomComponentcomponent) {super(endpointUri, component); // Do any other initialization... } public Producer createProducer() throws Exception {
Producer result = new
CustomProducer(this); return result; } public Consumer createConsumer(Processor processor) throws Exception {Consumer result = new
CustomConsumer(this, processor); configureConsumer(result);return result; } public boolean isSingleton() { return true; } // Implement the following methods, only if you need to set exchange properties. // public Exchange createExchange() {
return this.createExchange(getExchangePattern()); } public Exchange createExchange(ExchangePattern pattern) { Exchange result = new DefaultExchange(getCamelContext(), pattern); // Set exchange properties ... return result; } }
Implement a scheduled poll custom endpoint,
| |
You must to have at least one constructor that takes the endpoint URI,
| |
Implement the | |
Implement the ImportantDo not override the
| |
The | |
In general, it is not necessary to override the
|
If your custom endpoint conforms to the polling consumer pattern (see Consumer Patterns and Threading), it is implemented by inheriting
from the abstract class,
org.apache.camel.impl.DefaultPollingEndpoint, as shown in Example 7.4.
Example 7.4. DefaultPollingEndpoint Implementation
import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.impl.DefaultPollingEndpoint; public classCustomEndpointextends DefaultPollingEndpoint { ... public PollingConsumer createPollingConsumer() throws Exception { PollingConsumer result = newCustomConsumer(this); configureConsumer(result); return result; } // Do NOT implement createConsumer(). It is already implemented in DefaultPollingEndpoint. ... }
Because this CustomEndpoint class is a polling endpoint, you
must implement the createPollingConsumer() method instead of the
createConsumer() method. The consumer instance returned from
createPollingConsumer() must inherit from the
PollingConsumer interface. For details of how to implement
a polling consumer, see Polling consumer implementation.
Apart from the implementation of the createPollingConsumer()
method, the steps for implementing a DefaultPollingEndpoint are similar to the
steps for implementing a ScheduledPollEndpoint. See Example 7.3 for details.
If you want to expose the list of exchange instances that are pending in the current
endpoint, you can implement the
org.apache.camel.spi.BrowsableEndpoint interface, as shown
in Example 7.5. It makes sense to implement this
interface if the endpoint performs some sort of buffering of incoming events. For example,
the Fuse Mediation Router SEDA endpoint implements the BrowsableEndpoint
interface—see Example 7.6.
Example 7.5. BrowsableEndpoint Interface
package org.apache.camel.spi;
import java.util.List;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
public interface BrowsableEndpoint extends Endpoint {
List<Exchange> getExchanges();
}Example 7.6 shows a sample implementation of
SedaEndpoint. The SEDA endpoint is an example of an event-driven
endpoint. Incoming events are stored in a FIFO queue (an instance of
java.util.concurrent.BlockingQueue) and a SEDA consumer starts up a thread to
read and process the events. The events themselves are represented by
org.apache.camel.Exchange objects.
Example 7.6. SedaEndpoint Implementation
package org.apache.camel.component.seda;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;
public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
private BlockingQueue<Exchange> queue;
public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
super(endpointUri, component);
this.queue = queue;
}
public SedaEndpoint(String uri, SedaComponent component, Map parameters) {
this(uri, component, component.createQueue(uri, parameters));
}
public Producer createProducer() throws Exception {
return new CollectionProducer(this, getQueue());
}
public Consumer createConsumer(Processor processor) throws Exception {
return new SedaConsumer(this, processor);
}
public BlockingQueue<Exchange> getQueue() {
return queue;
}
public boolean isSingleton() {
return true;
}
public List<Exchange> getExchanges() {
return new ArrayList<Exchange>(getQueue());
}
}The | |
Following the usual pattern for an event-driven consumer,
| |
Another constructor is provided, which delegates queue creation to the parent component instance. | |
The | |
The | |
The | |
The | |
The |
This chapter describes how to implement the Consumer
interface, which is an essential step in the implementation of a Fuse Mediation Router component.
An instance of org.apache.camel.Consumer type represents
a source endpoint in a route. There are several different ways of implementing a consumer
(see Consumer Patterns and Threading), and this degree of
flexibility is reflected in the inheritance hierarchy ( see Figure 8.1), which includes several different base
classes for implementing a consumer.
For consumers that follow the scheduled poll pattern (see Scheduled poll
pattern), Fuse Mediation Router provides support for
injecting parameters into consumer instances. For example, consider the following endpoint
URI for a component identified by the custom prefix:
custom:destination?consumer.myConsumerParam
Fuse Mediation Router provides support for automatically injecting query options of the form
consumer.*. For the consumer.myConsumerParam parameter, you need
to define corresponding setter and getter methods on the
Consumer implementation class as follows:
public class CustomConsumer extends ScheduledPollConsumer {
...
String getMyConsumerParam() { ... }
void setMyConsumerParam(String s) { ... }
...
}Where the getter and setter methods follow the usual Java bean conventions (including capitalizing the first letter of the property name).
In addition to defining the bean methods in your Consumer implementation, you must also
remember to call the configureConsumer() method in the
implementation of Endpoint.createConsumer(). See Scheduled poll
endpoint implementation). Example 8.1 shows an example of a
createConsumer() method implementation, taken from the
FileEndpoint class in the file component:
Example 8.1. FileEndpoint createConsumer() Implementation
...
public class FileEndpoint extends ScheduledPollEndpoint {
...
public Consumer createConsumer(Processor processor) throws Exception {
Consumer result = new FileConsumer(this, processor);
configureConsumer(result);
return result;
}
...
}At run time, consumer parameter injection works as follows:
When the endpoint is created, the default implementation of
DefaultComponent.createEndpoint(String uri) parses the URI to extract the
consumer parameters, and stores them in the endpoint instance by calling
ScheduledPollEndpoint.configureProperties().
When createConsumer() is called, the method implementation
calls configureConsumer() to inject the consumer parameters
(see Example 8.1).
The configureConsumer() method uses Java reflection to call the setter
methods whose names match the relevant options after the consumer. prefix
has been stripped off.
A consumer that follows the scheduled poll pattern automatically supports the consumer parameters shown in Table 8.1 (which can appear as query options in the endpoint URI).
Table 8.1. Scheduled Poll Parameters
| Name | Default | Description |
|---|---|---|
initialDelay | 1000 | Delay, in milliseconds, before the first poll. |
delay | 500 | Depends on the value of the useFixedDelay flag (time unit is
milliseconds). |
useFixedDelay | false |
If If |
Fuse Mediation Router provides two special consumer implementations which can be used to convert back and forth between an event-driven consumer and a polling consumer. The following conversion classes are provided:
org.apache.camel.impl.EventDrivenPollingConsumer—Converts
an event-driven consumer into a polling consumer instance.
org.apache.camel.impl.DefaultScheduledPollConsumer—Converts
a polling consumer into an event-driven consumer instance.
In practice, these classes are used to simplify the task of implementing an
Endpoint type. The Endpoint
interface defines the following two methods for creating a consumer instance:
package org.apache.camel;
public interface Endpoint {
...
Consumer createConsumer(Processor processor) throws Exception;
PollingConsumer createPollingConsumer() throws Exception;
}createConsumer() returns an event-driven consumer and
createPollingConsumer() returns a polling consumer. You would
only implement one these methods. For example, if you are following the event-driven pattern
for your consumer, you would implement the createConsumer() method
provide a method implementation for createPollingConsumer() that
simply raises an exception. With the help of the conversion classes, however, Fuse Mediation Router is
able to provide a more useful default implementation.
For example, if you want to implement your consumer according to the event-driven
pattern, you implement the endpoint by extending DefaultEndpoint and
implementing the createConsumer() method. The implementation of
createPollingConsumer() is inherited from
DefaultEndpoint, where it is defined as follows:
public PollingConsumer<E> createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer<E>(this);
}The EventDrivenPollingConsumer constructor takes a reference to
the event-driven consumer, this, effectively wrapping it and converting it into
a polling consumer. To implement the conversion, the EventDrivenPollingConsumer
instance buffers incoming events and makes them available on demand through the
receive(), the receive(long timeout),
and the receiveNoWait() methods.
Analogously, if you are implementing your consumer according to the polling pattern, you
implement the endpoint by extending DefaultPollingEndpoint and
implementing the createPollingConsumer() method. In this case, the
implementation of the createConsumer() method is inherited from
DefaultPollingEndpoint, and the default implementation returns a
DefaultScheduledPollConsumer instance (which converts the polling
consumer into an event-driven consumer).
Consumer classes can optionally implement the
org.apache.camel.spi.ShutdownAware interface, which interacts with the
graceful shutdown mechanism, enabling a consumer to ask for extra time to shut down. This is
typically needed for components such as SEDA, which can have pending exchanges stored in an
internal queue. Normally, you would want to process all of the exchanges in the queue before
shutting down the SEDA consumer.
Example 8.2 shows the definition of the
ShutdownAware interface.
Example 8.2. ShutdownAware Interface
// Java
package org.apache.camel.spi;
import org.apache.camel.ShutdownRunningTask;
public interface ShutdownAware {
boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);
int getPendingExchangesSize();
}The ShutdownAware interface defines the following methods:
deferShutdownReturn true from this method, if you want to delay shutdown of the
consumer. The shutdownRunningTask argument is an enum which
can take either of the following values:
ShutdownRunningTask.CompleteCurrentTaskOnly—finish
processing the exchanges that are currently being processed by the consumer's
thread pool, but do not attempt to process any more exchanges than that.
ShutdownRunningTask.CompleteAllTasks—process
all of the pending exchanges. For example, in the case of
the SEDA component, the consumer would process all of the exchanges from its
incoming queue.
getPendingExchangesSizeIndicates how many exchanges remain to be processed by the consumer. A zero value indicates that processing is finished and the consumer can be shut down.
For an example of how to define the ShutdownAware methods, see Example 8.6.
You can implement a consumer in one of the following ways:
In an event-driven consumer, processing is driven explicitly by external events. The events are received through an event-listener interface, where the listener interface is specific to the particular event source.
Example 8.3 shows the implementation of the
JMXConsumer class, which is taken from the Fuse Mediation Router JMX component
implementation. The JMXConsumer class is an example of an event-driven
consumer, which is implemented by inheriting from the
org.apache.camel.impl.DefaultConsumer class. In the case of the
JMXConsumer example, events are represented by calls on the
NotificationListener.handleNotification() method, which is a
standard way of receiving JMX events. In order to receive these JMX events, it is necessary
to implement the NotificationListener interface and override
the handleNotification() method, as shown in Example 8.3.
Example 8.3. JMXConsumer Implementation
package org.apache.camel.component.jmx;
import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
public class JMXConsumer extends DefaultConsumer implements NotificationListener {
JMXEndpoint jmxEndpoint;
public JMXConsumer(JMXEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.jmxEndpoint = endpoint;
}
public void handleNotification(Notification notification, Object handback) {
try {
getProcessor().process(jmxEndpoint.createExchange(notification));
} catch (Throwable e) {
handleException(e);
}
}
}The | |
You must implement at least one constructor that takes a reference to the parent
endpoint, | |
The NoteThe | |
This line of code combines two steps. First, the JMX notification object is converted into an exchange object, which is the generic representation of an event in Fuse Mediation Router. Then the newly created exchange object is passed to the next processor in the route (invoked synchronously). | |
The |
In a scheduled poll consumer, polling events are automatically generated by a timer
class, java.util.concurrent.ScheduledExecutorService. To receive the generated
polling events, you must implement the ScheduledPollConsumer.poll() method (see
Consumer Patterns and Threading).
Example 8.4 shows how to implement a
consumer that follows the scheduled poll pattern, which is implemented by extending the
ScheduledPollConsumer class.
Example 8.4. ScheduledPollConsumer Implementation
import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledPollConsumer; public classCustomConsumerextends ScheduledPollConsumer {private final
CustomEndpointendpoint; publicCustomConsumer(CustomEndpointendpoint, Processor processor) {super(endpoint, processor); this.endpoint = endpoint; } protected void poll() throws Exception {
Exchange exchange = /* Receive exchange object ... */; // Example of a synchronous processor. getProcessor().process(exchange);
} @Override protected void doStart() throws Exception {
// Pre-Start: // Place code here to execute just before start of processing. super.doStart(); // Post-Start: // Place code here to execute just after start of processing. } @Override protected void doStop() throws Exception {
// Pre-Stop: // Place code here to execute just before processing stops. super.doStop(); // Post-Stop: // Place code here to execute just after processing stops. } }
Implement a scheduled poll consumer class,
| |
You must implement at least one constructor that takes a reference to the parent
endpoint, | |
Override the | |
In this example, the event is processed synchronously. If you want to process events
asynchronously, you should use a reference to an asynchronous processor instead, by
calling | |
(Optional) If you want some lines of code to execute as the
consumer is starting up, override the | |
(Optional) If you want some lines of code to execute as the
consumer is stopping, override the |
Example 8.5 outlines how to implement a
consumer that follows the polling pattern, which is implemented by extending the
PollingConsumerSupport class.
Example 8.5. PollingConsumerSupport Implementation
import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.PollingConsumerSupport; public classCustomConsumerextends PollingConsumerSupport {private final
CustomEndpointendpoint; publicCustomConsumer(CustomEndpointendpoint) {super(endpoint); this.endpoint = endpoint; } public Exchange receiveNoWait() {
Exchange exchange = /* Obtain an exchange object. */; // Further processing ... return exchange; } public Exchange receive() {
// Blocking poll ... } public Exchange receive(long timeout) {
// Poll with timeout ... } protected void doStart() throws Exception {
// Code to execute whilst starting up. } protected void doStop() throws Exception { // Code to execute whilst shutting down. } }
Implement your polling consumer class, | |
You must implement at least one constructor that takes a reference to the parent
endpoint, | |
The | |
The | |
The | |
If you want to insert code that executes while a consumer is starting up or shutting
down, implement the |
If the standard consumer patterns are not suitable for your consumer implementation, you
can implement the Consumer interface directly and write the threading code
yourself. When writing the threading code, however, it is important that you comply with the
standard Fuse Mediation Router threading model, as described in Threading Model in Implementing Enterprise Integration Patterns.
For example, the SEDA component from camel-core implements its own consumer
threading, which is consistent with the Fuse Mediation Router threading model. Example 8.6 shows an outline of how the
SedaConsumer class implements its threading.
Example 8.6. Custom Threading Implementation
package org.apache.camel.component.seda;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Consumer for the SEDA component.
*
* @version $Revision: 922485 $
*/
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware {
private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
private SedaEndpoint endpoint;
private Processor processor;
private ExecutorService executor;
...
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
this.processor = processor;
}
...
public void run() {
BlockingQueue<Exchange> queue = endpoint.getQueue();
// Poll the queue and process exchanges
...
}
...
protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers();
executor = endpoint.getCamelContext().getExecutorServiceStrategy()
.newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
for (int i = 0; i < poolSize; i++) {
executor.execute(this);
}
endpoint.onStarted(this);
}
protected void doStop() throws Exception {
endpoint.onStopped(this);
// must shutdown executor on stop to avoid overhead of having them running
endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor);
executor = null;
if (multicast != null) {
ServiceHelper.stopServices(multicast);
}
}
...
//----------
// Implementation of ShutdownAware interface
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
// deny stopping on shutdown as we want seda consumers to run in case some other queues
// depend on this consumer to run, so it can complete its exchanges
return true;
}
public int getPendingExchangesSize() {
// number of pending messages on the queue
return endpoint.getQueue().size();
}
}The | |
Implement the | |
The | |
Instead of creating threads directly, you should create a thread pool using the
For details, see Threading Model in Implementing Enterprise Integration Patterns. | |
Kick off the threads by calling the | |
The | |
Shut down the thread pool, which is represented by the |
This chapter describes how to implement the Producer
interface, which is an essential step in the implementation of a Fuse Mediation Router component.
An instance of org.apache.camel.Producer type represents
a target endpoint in a route. The role of the producer is to send requests
(In messages) to a specific physical endpoint and to receive the
corresponding response (Out or Fault message). A
Producer object is essentially a special kind of Processor that
appears at the end of a processor chain (equivalent to a route). Figure 9.1 shows the inheritance hierarchy for
producers.
Example 9.1 shows the definition of the
org.apache.camel.Producer interface.
Example 9.1. Producer Interface
package org.apache.camel;
public interface Producer extends Processor, Service, IsSingleton {
Endpoint<E> getEndpoint();
Exchange createExchange();
Exchange createExchange(ExchangePattern pattern);
Exchange createExchange(E exchange);
}The Producer interface defines the following
methods:
process()
(inherited from Processor)—The most important method. A
producer is essentially a special type of processor that sends a request to an endpoint,
instead of forwarding the exchange object to another processor. By overriding the
process() method, you define how the producer sends and
receives messages to and from the relevant endpoint.
getEndpoint()—Returns a reference to the parent
endpoint instance.
createExchange()—These overloaded methods are
analogous to the corresponding methods defined in the
Endpoint interface. Normally, these methods delegate to
the corresponding methods defined on the parent Endpoint
instance (this is what the DefaultEndpoint class does by
default). Occasionally, you might need to override these methods.
Processing an exchange object in a producer—which usually involves sending a
message to a remote destination and waiting for a reply—can potentially block for a
significant length of time. If you want to avoid blocking the current thread, you can opt to
implement the producer as an asynchronous processor. The asynchronous
processing pattern decouples the preceding processor from the producer, so that the
process() method returns without delay. See Asynchronous Processing.
When implementing a producer, you
can support the asynchronous processing model by implementing the
org.apache.camel.AsyncProcessor interface. On its own, this
is not enough to ensure that the asynchronous processing model will be used: it is also
necessary for the preceding processor in the chain to call the asynchronous version of the
process() method. The definition of the
AsyncProcessor interface is shown in Example 9.2.
Example 9.2. AsyncProcessor Interface
package org.apache.camel;
public interface AsyncProcessor extends Processor {
boolean process(Exchange exchange, AsyncCallback callback);
}The asynchronous version of the process() method
takes an extra argument, callback, of
org.apache.camel.AsyncCallback type. The corresponding
AsyncCallback interface is defined as shown in Example 9.3.
Example 9.3. AsyncCallback Interface
package org.apache.camel;
public interface AsyncCallback {
void done(boolean doneSynchronously);
}The caller of AsyncProcessor.process() must provide
an implementation of AsyncCallback to receive the
notification that processing has finished. The AsyncCallback.done()
method takes a boolean argument that indicates whether the processing was performed
synchronously or not. Normally, the flag would be false, to indicate
asynchronous processing. In some cases, however, it can make sense for the producer
not to process asynchronously (in spite of being asked to do so). For
example, if the producer knows that the processing of the exchange will complete rapidly, it
could optimise the processing by doing it synchronously. In this case, the
doneSynchronously flag should be set to
true.
When implementing a producer, you might find it helpful to call some of the methods in
the org.apache.camel.util.ExchangeHelper utility class. For full
details of the ExchangeHelper class, see The ExchangeHelper Class.
You can implement a producer in one of the following ways:
Example 9.4 outlines how to implement a synchronous
producer. In this case, call to Producer.process() blocks until a
reply is received.
Example 9.4. DefaultProducer Implementation
import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultProducer; public classCustomProducerextends DefaultProducer {public
CustomProducer(Endpoint endpoint) {super(endpoint); // Perform other initialization tasks... } public void process(Exchange exchange) throws Exception {
// Process exchange synchronously. // ... } }
Implement a custom synchronous producer class,
| |
Implement a constructor that takes a reference to the parent endpoint. | |
The
|
Example 9.5 outlines how to implement an
asynchronous producer. In this case, you must implement both a synchronous
process() method and an asynchronous process() method (which
takes an additional AsyncCallback argument).
Example 9.5. CollectionProducer Implementation
import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultProducer; public classCustomProducerextends DefaultProducer implements AsyncProcessor {public
CustomProducer(Endpoint endpoint) {super(endpoint); // ... } public void process(Exchange exchange) throws Exception {
// Process exchange synchronously. // ... } public boolean process(Exchange exchange, AsyncCallback callback) {
// Process exchange asynchronously.
CustomProducerTask task = newCustomProducerTask(exchange, callback); // Process 'task' in a separate thread... // ... return false;} } public class
CustomProducerTask implements Runnable {private Exchange exchange; private AsyncCallback callback; public
CustomProducerTask(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; this.callback = callback; } public void run() {// Process exchange. // ... callback.done(false); } }
Implement a custom asynchronous producer class,
| |
Implement a constructor that takes a reference to the parent endpoint. | |
Implement the synchronous | |
Implement the asynchronous | |
Normally, you return | |
The | |
The |
This chapter describes the Exchange interface. Since the
refactoring of the camel-core module performed in Apache Camel 2.0, there is no longer any
necessity to define custom exchange types. The DefaultExchange implementation
can now be used in all cases.
An instance of org.apache.camel.Exchange type
encapsulates the current message passing through a route, with additional metadata encoded
as exchange properties.
Figure 10.1 shows the inheritance hierarchy for
the exchange type. The default implementation, DefaultExchange, is
always used.
Example 10.1 shows the definition of the
org.apache.camel.Exchange interface.
Example 10.1. Exchange Interface
package org.apache.camel;
import java.util.Map;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.UnitOfWork;
public interface Exchange {
// Exchange property names (string constants)
// (Not shown here)
...
ExchangePattern getPattern();
void setPattern(ExchangePattern pattern);
Object getProperty(String name);
Object getProperty(String name, Object defaultValue);
<T> T getProperty(String name, Class<T> type);
<T> T getProperty(String name, Object defaultValue, Class<T> type);
void setProperty(String name, Object value);
Object removeProperty(String name);
Map<String, Object> getProperties();
boolean hasProperties();
Message getIn();
<T> T getIn(Class<T> type);
void setIn(Message in);
Message getOut();
<T> T getOut(Class<T> type);
void setOut(Message out);
boolean hasOut();
Throwable getException();
<T> T getException(Class<T> type);
void setException(Throwable e);
boolean isFailed();
boolean isTransacted();
boolean isRollbackOnly();
CamelContext getContext();
Exchange copy();
Endpoint getFromEndpoint();
void setFromEndpoint(Endpoint fromEndpoint);
String getFromRouteId();
void setFromRouteId(String fromRouteId);
UnitOfWork getUnitOfWork();
void setUnitOfWork(UnitOfWork unitOfWork);
String getExchangeId();
void setExchangeId(String id);
void addOnCompletion(Synchronization onCompletion);
void handoverCompletions(Exchange target);
}The Exchange interface defines the following
methods:
getPattern(), setPattern()—The exchange
pattern can be one of the values enumerated in
org.apache.camel.ExchangePattern. The following exchange pattern values
are supported:
InOnly
RobustInOnly
InOut
InOptionalOut
OutOnly
RobustOutOnly
OutIn
OutOptionalIn
setProperty(), getProperty(),
getProperties(), removeProperty(),
hasProperties()—Use the property setter and getter methods to
associate named properties with the exchange instance. The properties consist of
miscellaneous metadata that you might need for your component
implementation.
setIn(), getIn()—Setter and getter
methods for the In message.
The getIn()
implementation provided by the DefaultExchange class implements lazy
creation semantics: if the In message is null when
getIn() is called, the DefaultExchange class creates a
default In message.
setOut(), getOut(), hasOut()—Setter and
getter methods for the Out message.
The
getOut() method implicitly supports lazy creation of an
Out message. That is, if the current Out
message is null, a new message instance is automatically
created.
setException(), getException()—Getter and setter
methods for an exception object (of Throwable type).
isFailed()—Returns true, if the exchange failed
either due to an exception or due to a fault.
isTransacted()—Returns true, if the exchange is
transacted.
isRollback()—Returns true, if the exchange is marked
for rollback.
getContext()—Returns a reference to the associated
CamelContext instance.
copy()—Creates a new, identical (apart from the
exchange ID) copy of the current custom exchange object. The body and headers of the
In message, the Out message (if any), and
the Fault message (if any) are also copied by this
operation.
setFromEndpoint(), getFromEndpoint()—Getter and
setter methods for the consumer endpoint that orginated this message (which is typically
the endpoint appearing in the from() DSL command at the start of a
route).
setFromRouteId(), getFromRouteId()—Getters and
setters for the route ID that originated this exchange. The
getFromRouteId() method should only be called internally.
setUnitOfWork(),
getUnitOfWork()—Getter and setter methods for the
org.apache.camel.spi.UnitOfWork bean property. This property is only
required for exchanges that can participate in a transaction.
setExchangeId(), getExchangeId()—Getter and setter
methods for the exchange ID. Whether or not a custom component uses and exchange ID is
an implementation detail.
addOnCompletion()—Adds an
org.apache.camel.spi.Synchronization callback object, which gets called
when processing of the exchange has completed.
handoverCompletions()—Hands over all of the
OnCompletion callback objects to the specified exchange
object.
This chapter describes how to implement the Message
interface, which is an optional step in the implementation of a Fuse Mediation Router component.
An instance of org.apache.camel.Message type can represent any kind of
message (In or Out). Figure 11.1 shows the inheritance hierarchy for the
message type. You do not always need to implement a custom message type for a component. In
many cases, the default implementation, DefaultMessage, is adequate.
Example 11.1 shows the definition of the
org.apache.camel.Message interface.
Example 11.1. Message Interface
package org.apache.camel;
import java.util.Map;
import java.util.Set;
import javax.activation.DataHandler;
public interface Message {
String getMessageId();
void setMessageId(String messageId);
Exchange getExchange();
boolean isFault();
void setFault(boolean fault);
Object getHeader(String name);
Object getHeader(String name, Object defaultValue);
<T> T getHeader(String name, Class<T> type);
<T> T getHeader(String name, Object defaultValue, Class<T> type);
Map<String, Object> getHeaders();
void setHeader(String name, Object value);
void setHeaders(Map<String, Object> headers);
Object removeHeader(String name);
boolean removeHeaders(String pattern);
boolean hasHeaders();
Object getBody();
Object getMandatoryBody() throws InvalidPayloadException;
<T> T getBody(Class<T> type);
<T> T getMandatoryBody(Class<T> type) throws InvalidPayloadException;
void setBody(Object body);
<T> void setBody(Object body, Class<T> type);
DataHandler getAttachment(String id);
Map<String, DataHandler> getAttachments();
Set<String> getAttachmentNames();
void removeAttachment(String id);
void addAttachment(String id, DataHandler content);
void setAttachments(Map<String, DataHandler> attachments);
boolean hasAttachments();
Message copy();
void copyFrom(Message message);
String createExchangeId();
}The Message interface defines the following
methods:
setMessageId(), getMessageId()—Getter and setter
methods for the message ID. Whether or not you need to use a message ID in your custom
component is an implementation detail.
getExchange()—Returns a reference to the parent exchange
object.
isFault(), setFault()—Getter and setter methods for
the fault flag, which indicates whether or not this message is a fault message.
getHeader(), getHeaders(), setHeader(),
setHeaders(), removeHeader(),
hasHeaders()—Getter and setter methods for the message headers. In
general, these message headers can be used either to store actual header data, or to
store miscellaneous metadata.
getBody(), getMandatoryBody(),
setBody()—Getter and setter methods for the message body. The
getMandatoryBody() accessor guarantees that the returned body is non-null, otherwise the
InvalidPayloadException exception is thrown.
getAttachment(), getAttachments(),
getAttachmentNames(), removeAttachment(),
addAttachment(), setAttachments(),
hasAttachments()—Methods to get, set, add, and remove
attachments.
copy()—Creates a new, identical (including the
message ID) copy of the current custom message object.
copyFrom()—Copies the complete contents (including
the message ID) of the specified generic message object, message, into the
current message instance. Because this method must be able to copy from
any message type, it copies the generic message properties, but
not the custom properties.
createExchangeId()—Returns the unique ID for this exchange, if
the message implementation is capable of providing an ID; otherwise, return
null.
Example 11.2 outlines how to implement a message by
extending the DefaultMessage class.
Example 11.2. Custom Message Implementation
import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultMessage; public classCustomMessageextends DefaultMessage {public
CustomMessage() {// Create message with default properties... } @Override public String toString() {
// Return a stringified message... } @Override public
CustomMessagenewInstance() {return new
CustomMessage( ... ); } @Override protected Object createBody() {// Return message body (lazy creation). } @Override protected void populateInitialHeaders(Map<String, Object> map) {
// Initialize headers from underlying message (lazy creation). } @Override protected void populateInitialAttachments(Map<String, DataHandler> map) {
// Initialize attachments from underlying message (lazy creation). } }
Implements a custom message class, | |
Typically, you need a default constructor that creates a message with default properties. | |
Override the | |
The | |
The | |
The | |
The |