Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 42. Producer Interface

Abstract

This chapter describes how to implement the Producer interface, which is an essential step in the implementation of a Apache Camel component.

42.1. The Producer Interface

Overview

An instance of org.apache.camel.Producer type represents a target endpoint in a route. The role of the producer is to send requests (In messages) to a specific physical endpoint and to receive the corresponding response (Out or Fault message). A Producer object is essentially a special kind of Processor that appears at the end of a processor chain (equivalent to a route). Figure 42.1, “Producer Inheritance Hierarchy” shows the inheritance hierarchy for producers.

Figure 42.1. Producer Inheritance Hierarchy

Producer inheritance hierarchy

The Producer interface

Example 42.1, “Producer Interface” shows the definition of the org.apache.camel.Producer interface.

Example 42.1. Producer Interface

package org.apache.camel;

public interface Producer extends Processor, Service, IsSingleton {

    Endpoint<E> getEndpoint();

    Exchange createExchange();

    Exchange createExchange(ExchangePattern pattern);

    Exchange createExchange(E exchange);
}

Producer methods

The Producer interface defines the following methods:

  • process()(inherited from Processor) — The most important method. A producer is essentially a special type of processor that sends a request to an endpoint, instead of forwarding the exchange object to another processor. By overriding the process() method, you define how the producer sends and receives messages to and from the relevant endpoint.
  • getEndpoint() — Returns a reference to the parent endpoint instance.
  • createExchange() — These overloaded methods are analogous to the corresponding methods defined in the Endpoint interface. Normally, these methods delegate to the corresponding methods defined on the parent Endpoint instance (this is what the DefaultEndpoint class does by default). Occasionally, you might need to override these methods.

Asynchronous processing

Processing an exchange object in a producer — which usually involves sending a message to a remote destination and waiting for a reply — can potentially block for a significant length of time. If you want to avoid blocking the current thread, you can opt to implement the producer as an asynchronous processor. The asynchronous processing pattern decouples the preceding processor from the producer, so that the process() method returns without delay. See Section 38.1.4, “Asynchronous Processing”.

When implementing a producer, you can support the asynchronous processing model by implementing the org.apache.camel.AsyncProcessor interface. On its own, this is not enough to ensure that the asynchronous processing model will be used: it is also necessary for the preceding processor in the chain to call the asynchronous version of the process() method. The definition of the AsyncProcessor interface is shown in Example 42.2, “AsyncProcessor Interface”.

Example 42.2. AsyncProcessor Interface

package org.apache.camel;

public interface AsyncProcessor extends Processor {
    boolean process(Exchange exchange, AsyncCallback callback);
}

The asynchronous version of the process() method takes an extra argument, callback, of org.apache.camel.AsyncCallback type. The corresponding AsyncCallback interface is defined as shown in Example 42.3, “AsyncCallback Interface”.

Example 42.3. AsyncCallback Interface

package org.apache.camel;

public interface AsyncCallback {
    void done(boolean doneSynchronously);
}

The caller of AsyncProcessor.process() must provide an implementation of AsyncCallback to receive the notification that processing has finished. The AsyncCallback.done() method takes a boolean argument that indicates whether the processing was performed synchronously or not. Normally, the flag would be false, to indicate asynchronous processing. In some cases, however, it can make sense for the producer not to process asynchronously (in spite of being asked to do so). For example, if the producer knows that the processing of the exchange will complete rapidly, it could optimise the processing by doing it synchronously. In this case, the doneSynchronously flag should be set to true.

ExchangeHelper class

When implementing a producer, you might find it helpful to call some of the methods in the org.apache.camel.util.ExchangeHelper utility class. For full details of the ExchangeHelper class, see Section 35.4, “The ExchangeHelper Class”.

42.2. Implementing the Producer Interface

Alternative ways of implementing a producer

You can implement a producer in one of the following ways:

How to implement a synchronous producer

Example 42.4, “DefaultProducer Implementation” outlines how to implement a synchronous producer. In this case, call to Producer.process() blocks until a reply is received.

Example 42.4. DefaultProducer Implementation

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // Perform other initialization tasks...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }
}
1
Implement a custom synchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class.
2
Implement a constructor that takes a reference to the parent endpoint.
3
The process() method implementation represents the core of the producer code. The implementation of the process() method is entirely dependent on the type of component that you are implementing.

In outline, the process() method is normally implemented as follows:

  • If the exchange contains an In message, and if this is consistent with the specified exchange pattern, then send the In message to the designated endpoint.
  • If the exchange pattern anticipates the receipt of an Out message, then wait until the Out message has been received. This typically causes the process() method to block for a significant length of time.
  • When a reply is received, call exchange.setOut() to attach the reply to the exchange object. If the reply contains a fault message, set the fault flag on the Out message using Message.setFault(true).

How to implement an asynchronous producer

Example 42.5, “CollectionProducer Implementation” outlines how to implement an asynchronous producer. In this case, you must implement both a synchronous process() method and an asynchronous process() method (which takes an additional AsyncCallback argument).

Example 42.5. CollectionProducer Implementation

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class _CustomProducer_ extends DefaultProducer implements AsyncProcessor { 1

    public _CustomProducer_(Endpoint endpoint) { 2
        super(endpoint);
        // ...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }

    public boolean process(Exchange exchange, AsyncCallback callback) { 4
        // Process exchange asynchronously.
        CustomProducerTask task = new CustomProducerTask(exchange, callback);
        // Process 'task' in a separate thread...
        // ...
        return false; 5
    }
}

public class CustomProducerTask implements Runnable { 6
    private Exchange exchange;
    private AsyncCallback callback;

    public CustomProducerTask(Exchange exchange, AsyncCallback callback) {
        this.exchange = exchange;
        this.callback = callback;
    }

    public void run() { 7
        // Process exchange.
        // ...
        callback.done(false);
    }
}
1
Implement a custom asynchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class, and implementing the AsyncProcessor interface.
2
Implement a constructor that takes a reference to the parent endpoint.
3
Implement the synchronous process() method.
4
Implement the asynchronous process() method. You can implement the asynchronous method in several ways. The approach shown here is to create a java.lang.Runnable instance, task, that represents the code that runs in a sub-thread. You then use the Java threading API to run the task in a sub-thread (for example, by creating a new thread or by allocating the task to an existing thread pool).
5
Normally, you return false from the asynchronous process() method, to indicate that the exchange was processed asynchronously.
6
The CustomProducerTask class encapsulates the processing code that runs in a sub-thread. This class must store a copy of the Exchange object, exchange, and the AsyncCallback object, callback, as private member variables.
7
The run() method contains the code that sends the In message to the producer endpoint and waits to receive the reply, if any. After receiving the reply (Out message or Fault message) and inserting it into the exchange object, you must call callback.done() to notify the caller that processing is complete.