Red Hat Training

A Red Hat training course is available for Red Hat Fuse

48.2. Implementing the Producer Interface

Alternative ways of implementing a producer

You can implement a producer in one of the following ways:

How to implement a synchronous producer

Example 48.4, “DefaultProducer Implementation” outlines how to implement a synchronous producer. In this case, call to Producer.process() blocks until a reply is received.

Example 48.4. DefaultProducer Implementation

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // Perform other initialization tasks...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }
}
1
Implement a custom synchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class.
2
Implement a constructor that takes a reference to the parent endpoint.
3
The process() method implementation represents the core of the producer code. The implementation of the process() method is entirely dependent on the type of component that you are implementing. In outline, the process() method is normally implemented as follows:
  • If the exchange contains an In message, and if this is consistent with the specified exchange pattern, then send the In message to the designated endpoint.
  • If the exchange pattern anticipates the receipt of an Out message, then wait until the Out message has been received. This typically causes the process() method to block for a significant length of time.
  • When a reply is received, call exchange.setOut() to attach the reply to the exchange object. If the reply contains a fault message, set the fault flag on the Out message using Message.setFault(true).

How to implement an asynchronous producer

Example 48.5, “CollectionProducer Implementation” outlines how to implement an asynchronous producer. In this case, you must implement both a synchronous process() method and an asynchronous process() method (which takes an additional AsyncCallback argument).

Example 48.5. CollectionProducer Implementation

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer implements AsyncProcessor { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // ...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }

    public boolean process(Exchange exchange, AsyncCallback callback) { 4
        // Process exchange asynchronously.
        CustomProducerTask task = new CustomProducerTask(exchange, callback);
        // Process 'task' in a separate thread...
        // ...
        return false; 5
    }
}

public class CustomProducerTask implements Runnable { 6
    private Exchange exchange;
    private AsyncCallback callback;

    public CustomProducerTask(Exchange exchange, AsyncCallback callback) {
        this.exchange = exchange;
        this.callback = callback;
    }

    public void run() { 7
        // Process exchange.
        // ...
        callback.done(false);
    }
}
1
Implement a custom asynchronous producer class, CustomProducer, by extending the org.apache.camel.impl.DefaultProducer class, and implementing the AsyncProcessor interface.
2
Implement a constructor that takes a reference to the parent endpoint.
3
Implement the synchronous process() method.
4
Implement the asynchronous process() method. You can implement the asynchronous method in several ways. The approach shown here is to create a java.lang.Runnable instance, task, that represents the code that runs in a sub-thread. You then use the Java threading API to run the task in a sub-thread (for example, by creating a new thread or by allocating the task to an existing thread pool).
5
Normally, you return false from the asynchronous process() method, to indicate that the exchange was processed asynchronously.
6
The CustomProducerTask class encapsulates the processing code that runs in a sub-thread. This class must store a copy of the Exchange object, exchange, and the AsyncCallback object, callback, as private member variables.
7
The run() method contains the code that sends the In message to the producer endpoint and waits to receive the reply, if any. After receiving the reply (Out message or Fault message) and inserting it into the exchange object, you must call callback.done() to notify the caller that processing is complete.