Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 46. Producer and Consumer Templates

Abstract

The producer and consumer templates in Apache Camel are modelled after a feature of the Spring container API, whereby access to a resource is provided through a simplified, easy-to-use API known as a template. In the case of Apache Camel, the producer template and consumer template provide simplified interfaces for sending messages to and receiving messages from producer endpoints and consumer endpoints.

46.1. Using the Producer Template

46.1.1. Introduction to the Producer Template

Overview

The producer template supports a variety of different approaches to invoking producer endpoints. There are methods that support different formats for the request message (as an Exchange object, as a message body, as a message body with a single header setting, and so on) and there are methods to support both the synchronous and the asynchronous style of invocation. Overall, producer template methods can be grouped into the following categories:

Synchronous invocation

The methods for invoking endpoints synchronously have names of the form sendSuffix() and requestSuffix(). For example, the methods for invoking an endpoint using either the default message exchange pattern (MEP) or an explicitly specified MEP are named send(), sendBody(), and sendBodyAndHeader() (where these methods respectively send an Exchange object, a message body, or a message body and header value). If you want to force the MEP to be InOut (request/reply semantics), you can call the request(), requestBody(), and requestBodyAndHeader() methods instead.
The following example shows how to create a ProducerTemplate instance and use it to send a message body to the activemq:MyQueue endpoint. The example also shows how to send a message body and header value using sendBodyAndHeader().
import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue
template.sendBody("activemq:MyQueue", "<hello>world!</hello>");

// Send with a body and header 
template.sendBodyAndHeader(
    "activemq:MyQueue",
    "<hello>world!</hello>",
    "CustomerRating", "Gold" );

Synchronous invocation with a processor

A special case of synchronous invocation is where you provide the send() method with a Processor argument instead of an Exchange argument. In this case, the producer template implicitly asks the specified endpoint to create an Exchange instance (typically, but not always having the InOnly MEP by default). This default exchange is then passed to the processor, which initializes the contents of the exchange object.
The following example shows how to send an exchange initialized by the MyProcessor processor to the activemq:MyQueue endpoint.
import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue, using a processor to initialize
template.send("activemq:MyQueue", new MyProcessor());
The MyProcessor class is implemented as shown in the following example. In addition to setting the In message body (as shown here), you could also initialize message heades and exchange properties.
import org.apache.camel.Processor;
import org.apache.camel.Exchange;
...
public class MyProcessor implements Processor {
    public MyProcessor() { }

    public void process(Exchange ex) {
        ex.getIn().setBody("<hello>world!</hello>");
    }
}

Asynchronous invocation

The methods for invoking endpoints asynchronously have names of the form asyncSendSuffix() and asyncRequestSuffix(). For example, the methods for invoking an endpoint using either the default message exchange pattern (MEP) or an explicitly specified MEP are named asyncSend() and asyncSendBody() (where these methods respectively send an Exchange object or a message body). If you want to force the MEP to be InOut (request/reply semantics), you can call the asyncRequestBody(), asyncRequestBodyAndHeader(), and asyncRequestBodyAndHeaders() methods instead.
The following example shows how to send an exchange asynchronously to the direct:start endpoint. The asyncSend() method returns a java.util.concurrent.Future object, which is used to retrieve the invocation result at a later time.
import java.util.concurrent.Future;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
...
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncSend("direct:start", exchange);

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the resulting exchange from the Future
Exchange result = future.get();
The producer template also provides methods to send a message body asynchronously (for example, using asyncSendBody() or asyncRequestBody()). In this case, you can use one of the following helper methods to extract the returned message body from the Future object:
<T> T extractFutureBody(Future future, Class<T> type);
<T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
The first version of the extractFutureBody() method blocks until the invocation completes and the reply message is available. The second version of the extractFutureBody() method allows you to specify a timeout. Both methods have a type argument, type, which casts the returned message body to the specified type using a built-in type converter.
The following example shows how to use the asyncRequestBody() method to send a message body to the direct:start endpoint. The blocking extractFutureBody() method is then used to retrieve the reply message body from the Future object.
Future<Object> future = template.asyncRequestBody("direct:start", "Hello");

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the reply message body as a String type
String result = template.extractFutureBody(future, String.class);

Asynchronous invocation with a callback

In the preceding asynchronous examples, the request message is dispatched in a sub-thread, while the reply is retrieved and processed by the main thread. The producer template also gives you the option, however, of processing replies in the sub-thread, using one of the asyncCallback(), asyncCallbackSendBody(), or asyncCallbackRequestBody() methods. In this case, you supply a callback object (of org.apache.camel.impl.SynchronizationAdapter type), which automatically gets invoked in the sub-thread as soon as a reply message arrives.
The Synchronization callback interface is defined as follows:
package org.apache.camel.spi;

import org.apache.camel.Exchange;

public interface Synchronization {
    void onComplete(Exchange exchange);
    void onFailure(Exchange exchange);
}
Where the onComplete() method is called on receipt of a normal reply and the onFailure() method is called on receipt of a fault message reply. Only one of these methods gets called back, so you must override both of them to ensure that all types of reply are processed.
The following example shows how to send an exchange to the direct:start endpoint, where the reply message is processed in the sub-thread by the SynchronizationAdapter callback object.
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SynchronizationAdapter;
...
Exchange exchange = context.getEndpoint("direct:start").createExchange();
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() {
    @Override
    public void onComplete(Exchange exchange) {
        assertEquals("Hello World", exchange.getIn().getBody());
    }
});
Where the SynchronizationAdapter class is a default implementation of the Synchronization interface, which you can override to provide your own definitions of the onComplete() and onFailure() callback methods.
You still have the option of accessing the reply from the main thread, because the asyncCallback() method also returns a Future object—for example:
// Retrieve the reply from the main thread, specifying a timeout
Exchange reply = future.get(10, TimeUnit.SECONDS);

46.1.2. Synchronous Send

Overview

The synchronous send methods are a collection of methods that you can use to invoke a producer endpoint, where the current thread blocks until the method invocation is complete and the reply (if any) has been received. These methods are compatible with any kind of message exchange protocol.

Send an exchange

The basic send() method is a general-purpose method that sends the contents of an Exchange object to an endpoint, using the message exchange pattern (MEP) of the exchange. The return value is the exchange that you get after it has been processed by the producer endpoint (possibly containing an Out message, depending on the MEP).
There are three varieties of send() method for sending an exchange that let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object.
Exchange send(Exchange exchange);
Exchange send(String endpointUri, Exchange exchange);
Exchange send(Endpoint endpoint, Exchange exchange);

Send an exchange populated by a processor

A simple variation of the general send() method is to use a processor to populate a default exchange, instead of supplying the exchange object explicitly (see the section called “Synchronous invocation with a processor” for details).
The send() methods for sending an exchange populated by a processor let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object. In addition, you can optionally specify the exchange's MEP by supplying the pattern argument, instead of accepting the default.
Exchange send(Processor processor);
Exchange send(String endpointUri, Processor processor);
Exchange send(Endpoint endpoint, Processor processor);
Exchange send(
    String endpointUri,
    ExchangePattern pattern,
    Processor processor
);
Exchange send(
    Endpoint endpoint,
    ExchangePattern pattern,
    Processor processor
);

Send a message body

If you are only concerned with the contents of the message body that you want to send, you can use the sendBody() methods to provide the message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The sendBody() methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object. In addition, you can optionally specify the exchange's MEP by supplying the pattern argument, instead of accepting the default. The methods without a pattern argument return void (even though the invocation might give rise to a reply in some cases); and the methods with a pattern argument return either the body of the Out message (if there is one) or the body of the In message (otherwise).
void sendBody(Object body);
void sendBody(String endpointUri, Object body);
void sendBody(Endpoint endpoint, Object body);
Object sendBody(
    String endpointUri,
    ExchangePattern pattern,
    Object body
);
Object sendBody(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body
);

Send a message body and header(s)

For testing purposes, it is often interesting to try out the effect of a single header setting and the sendBodyAndHeader() methods are useful for this kind of header testing. You supply the message body and header setting as arguments to sendBodyAndHeader() and let the producer template take care of inserting the body and header setting into a default exchange object.
The sendBodyAndHeader() methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object. In addition, you can optionally specify the exchange's MEP by supplying the pattern argument, instead of accepting the default. The methods without a pattern argument return void (even though the invocation might give rise to a reply in some cases); and the methods with a pattern argument return either the body of the Out message (if there is one) or the body of the In message (otherwise).
void sendBodyAndHeader(
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);
The sendBodyAndHeaders() methods are similar to the sendBodyAndHeader() methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.
void sendBodyAndHeaders(
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);

Send a message body and exchange property

You can try out the effect of setting a single exchange property using the sendBodyAndProperty() methods. You supply the message body and property setting as arguments to sendBodyAndProperty() and let the producer template take care of inserting the body and exchange property into a default exchange object.
The sendBodyAndProperty() methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object. In addition, you can optionally specify the exchange's MEP by supplying the pattern argument, instead of accepting the default. The methods without a pattern argument return void (even though the invocation might give rise to a reply in some cases); and the methods with a pattern argument return either the body of the Out message (if there is one) or the body of the In message (otherwise).
void sendBodyAndProperty(
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    String endpointUri,
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    Endpoint endpoint,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    String endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);

46.1.3. Synchronous Request with InOut Pattern

Overview

The synchronous request methods are similar to the synchronous send methods, except that the request methods force the message exchange pattern to be InOut (conforming to request/reply semantics). Hence, it is generally convenient to use a synchronous request method, if you expect to receive a reply from the producer endpoint.

Request an exchange populated by a processor

The basic request() method is a general-purpose method that uses a processor to populate a default exchange and forces the message exchange pattern to be InOut (so that the invocation obeys request/reply semantics). The return value is the exchange that you get after it has been processed by the producer endpoint, where the Out message contains the reply message.
The request() methods for sending an exchange populated by a processor let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Exchange request(String endpointUri, Processor processor);
Exchange request(Endpoint endpoint, Processor processor);

Request a message body

If you are only concerned with the contents of the message body in the request and in the reply, you can use the requestBody() methods to provide the request message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The requestBody() methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint object. The return value is the body of the reply message (Out message body), which can either be returned as plain Object or converted to a specific type, T, using the built-in type converters (see Section 43.3, “Built-In Type Converters”).
Object requestBody(Object body);
<T> T requestBody(Object body, Class<T> type);
Object requestBody(
    String endpointUri,
    Object body
);
<T> T requestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Object requestBody(
    Endpoint endpoint,
    Object body
);
<T> T requestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

Request a message body and header(s)

You can try out the effect of setting a single header value using the requestBodyAndHeader() methods. You supply the message body and header setting as arguments to requestBodyAndHeader() and let the producer template take care of inserting the body and exchange property into a default exchange object.
The requestBodyAndHeader() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object. The return value is the body of the reply message (Out message body), which can either be returned as plain Object or converted to a specific type, T, using the built-in type converters (see Section 43.3, “Built-In Type Converters”).
Object requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Object requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
The requestBodyAndHeaders() methods are similar to the requestBodyAndHeader() methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.
Object requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Object requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

46.1.4. Asynchronous Send

Overview

The producer template provides a variety of methods for invoking a producer endpoint asynchronously, so that the main thread does not block while waiting for the invocation to complete and the reply message can be retrieved at a later time. The asynchronous send methods described in this section are compatible with any kind of message exchange protocol.

Send an exchange

The basic asyncSend() method takes an Exchange argument and invokes an endpoint asynchronously, using the message exchange pattern (MEP) of the specified exchange. The return value is a java.util.concurrent.Future object, which is a ticket you can use to collect the reply message at a later time—for details of how to obtain the return value from the Future object, see the section called “Asynchronous invocation”.
The following asyncSend() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);

Send an exchange populated by a processor

A simple variation of the general asyncSend() method is to use a processor to populate a default exchange, instead of supplying the exchange object explicitly.
The following asyncSend() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Future<Exchange> asyncSend(String endpointUri, Processor processor);
Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);

Send a message body

If you are only concerned with the contents of the message body that you want to send, you can use the asyncSendBody() methods to send a message body asynchronously and let the producer template take care of inserting the body into a default exchange object.
The asyncSendBody() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Future<Object> asyncSendBody(String endpointUri, Object body);
Future<Object> asyncSendBody(Endpoint endpoint, Object body);

46.1.5. Asynchronous Request with InOut Pattern

Overview

The asynchronous request methods are similar to the asynchronous send methods, except that the request methods force the message exchange pattern to be InOut (conforming to request/reply semantics). Hence, it is generally convenient to use an asynchronous request method, if you expect to receive a reply from the producer endpoint.

Request a message body

If you are only concerned with the contents of the message body in the request and in the reply, you can use the requestBody() methods to provide the request message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The asyncRequestBody() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object. The return value that is retrievable from the Future object is the body of the reply message (Out message body), which can be returned either as a plain Object or converted to a specific type, T, using a built-in type converter (see the section called “Asynchronous invocation”).
Future<Object> asyncRequestBody(
    String endpointUri,
    Object body
);
<T> Future<T> asyncRequestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Future<Object> asyncRequestBody(
    Endpoint endpoint,
    Object body
);
<T> Future<T> asyncRequestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

Request a message body and header(s)

You can try out the effect of setting a single header value using the asyncRequestBodyAndHeader() methods. You supply the message body and header setting as arguments to asyncRequestBodyAndHeader() and let the producer template take care of inserting the body and exchange property into a default exchange object.
The asyncRequestBodyAndHeader() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object. The return value that is retrievable from the Future object is the body of the reply message (Out message body), which can be returned either as a plain Object or converted to a specific type, T, using a built-in type converter (see the section called “Asynchronous invocation”).
Future<Object> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
The asyncRequestBodyAndHeaders() methods are similar to the asyncRequestBodyAndHeader() methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.
Future<Object> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

46.1.6. Asynchronous Send with Callback

Overview

The producer template also provides the option of processing the reply message in the same sub-thread that is used to invoke the producer endpoint. In this case, you provide a callback object, which automatically gets invoked in the sub-thread as soon as the reply message is received. In other words, the asynchronous send with callback methods enable you to initiate an invocation in your main thread and then have all of the associated processing—invocation of the producer endpoint, waiting for a reply and processing the reply—occur asynchronously in a sub-thread.

Send an exchange

The basic asyncCallback() method takes an Exchange argument and invokes an endpoint asynchronously, using the message exchange pattern (MEP) of the specified exchange. This method is similar to the asyncSend() method for exchanges, except that it takes an additional org.apache.camel.spi.Synchronization argument, which is a callback interface with two methods: onComplete() and onFailure(). For details of how to use the Synchronization callback, see the section called “Asynchronous invocation with a callback”.
The following asyncCallback() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Future<Exchange> asyncCallback(
    String endpointUri,
    Exchange exchange,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Exchange exchange,
    Synchronization onCompletion
);

Send an exchange populated by a processor

The asyncCallback() method for processors calls a processor to populate a default exchange and forces the message exchange pattern to be InOut (so that the invocation obeys request/reply semantics).
The following asyncCallback() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Future<Exchange> asyncCallback(
    String endpointUri,
    Processor processor,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Processor processor,
    Synchronization onCompletion
);

Send a message body

If you are only concerned with the contents of the message body that you want to send, you can use the asyncCallbackSendBody() methods to send a message body asynchronously and let the producer template take care of inserting the body into a default exchange object.
The asyncCallbackSendBody() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Future<Object> asyncCallbackSendBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackSendBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);

Request a message body

If you are only concerned with the contents of the message body in the request and in the reply, you can use the asyncCallbackRequestBody() methods to provide the request message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The asyncCallbackRequestBody() methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint object.
Future<Object> asyncCallbackRequestBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackRequestBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);