Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 41. Consumer Interface

Abstract

This chapter describes how to implement the Consumer interface, which is an essential step in the implementation of a Apache Camel component.

41.1. The Consumer Interface

Overview

An instance of org.apache.camel.Consumer type represents a source endpoint in a route. There are several different ways of implementing a consumer (see Section 38.1.3, “Consumer Patterns and Threading”), and this degree of flexibility is reflected in the inheritance hierarchy ( see Figure 41.1, “Consumer Inheritance Hierarchy”), which includes several different base classes for implementing a consumer.

Figure 41.1. Consumer Inheritance Hierarchy

Consumer inheritance hierarchy

Consumer parameter injection

For consumers that follow the scheduled poll pattern (see the section called “Scheduled poll pattern”), Apache Camel provides support for injecting parameters into consumer instances. For example, consider the following endpoint URI for a component identified by the custom prefix:

custom:destination?consumer.myConsumerParam

Apache Camel provides support for automatically injecting query options of the form consumer.\*. For the consumer.myConsumerParam parameter, you need to define corresponding setter and getter methods on the Consumer implementation class as follows:

public class CustomConsumer extends ScheduledPollConsumer {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}

Where the getter and setter methods follow the usual Java bean conventions (including capitalizing the first letter of the property name).

In addition to defining the bean methods in your Consumer implementation, you must also remember to call the configureConsumer() method in the implementation of Endpoint.createConsumer(). See the section called “Scheduled poll endpoint implementation”). Example 41.1, “FileEndpoint createConsumer() Implementation” shows an example of a createConsumer() method implementation, taken from the FileEndpoint class in the file component:

Example 41.1. FileEndpoint createConsumer() Implementation

...
public class FileEndpoint extends ScheduledPollEndpoint {
    ...
    public Consumer createConsumer(Processor processor) throws Exception {
        Consumer result = new FileConsumer(this, processor);
        configureConsumer(result);
        return result;
    }
    ...
    }

At run time, consumer parameter injection works as follows:

  1. When the endpoint is created, the default implementation of DefaultComponent.createEndpoint(String uri) parses the URI to extract the consumer parameters, and stores them in the endpoint instance by calling ScheduledPollEndpoint.configureProperties().
  2. When createConsumer() is called, the method implementation calls configureConsumer() to inject the consumer parameters (see Example 41.1, “FileEndpoint createConsumer() Implementation”).
  3. The configureConsumer() method uses Java reflection to call the setter methods whose names match the relevant options after the consumer. prefix has been stripped off.

Scheduled poll parameters

A consumer that follows the scheduled poll pattern automatically supports the consumer parameters shown in Table 41.1, “Scheduled Poll Parameters” (which can appear as query options in the endpoint URI).

Table 41.1. Scheduled Poll Parameters

NameDefaultDescription

initialDelay

1000

Delay, in milliseconds, before the first poll.

delay

500

Depends on the value of the useFixedDelay flag (time unit is milliseconds).

useFixedDelay

false

If false, the delay parameter is interpreted as the polling period. Polls will occur at initialDelay, initialDelay+delay, initialDelay+2\*delay, and so on.

If true, the delay parameter is interpreted as the time elapsed between the previous execution and the next execution. Polls will occur at initialDelay, initialDelay+[ProcessingTime]+delay, and so on. Where ProcessingTime is the time taken to process an exchange object in the current thread.

Converting between event-driven and polling consumers

Apache Camel provides two special consumer implementations which can be used to convert back and forth between an event-driven consumer and a polling consumer. The following conversion classes are provided:

  • org.apache.camel.impl.EventDrivenPollingConsumer — Converts an event-driven consumer into a polling consumer instance.
  • org.apache.camel.impl.DefaultScheduledPollConsumer — Converts a polling consumer into an event-driven consumer instance.

In practice, these classes are used to simplify the task of implementing an Endpoint type. The Endpoint interface defines the following two methods for creating a consumer instance:

package org.apache.camel;

public interface Endpoint {
    ...
    Consumer createConsumer(Processor processor) throws Exception;
    PollingConsumer createPollingConsumer() throws Exception;
}

createConsumer() returns an event-driven consumer and createPollingConsumer() returns a polling consumer. You would only implement one these methods. For example, if you are following the event-driven pattern for your consumer, you would implement the createConsumer() method provide a method implementation for createPollingConsumer() that simply raises an exception. With the help of the conversion classes, however, Apache Camel is able to provide a more useful default implementation.

For example, if you want to implement your consumer according to the event-driven pattern, you implement the endpoint by extending DefaultEndpoint and implementing the createConsumer() method. The implementation of createPollingConsumer() is inherited from DefaultEndpoint, where it is defined as follows:

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}

The EventDrivenPollingConsumer constructor takes a reference to the event-driven consumer, this, effectively wrapping it and converting it into a polling consumer. To implement the conversion, the EventDrivenPollingConsumer instance buffers incoming events and makes them available on demand through the receive(), the receive(long timeout), and the receiveNoWait() methods.

Analogously, if you are implementing your consumer according to the polling pattern, you implement the endpoint by extending DefaultPollingEndpoint and implementing the createPollingConsumer() method. In this case, the implementation of the createConsumer() method is inherited from DefaultPollingEndpoint, and the default implementation returns a DefaultScheduledPollConsumer instance (which converts the polling consumer into an event-driven consumer).

ShutdownPrepared interface

Consumer classes can optionally implement the org.apache.camel.spi.ShutdownPrepared interface, which enables your custom consumer endpoint to receive shutdown notifications.

Example 41.2, “ShutdownPrepared Interface” shows the definition of the ShutdownPrepared interface.

Example 41.2. ShutdownPrepared Interface

package org.apache.camel.spi;

public interface ShutdownPrepared {

    void prepareShutdown(boolean forced);

}

The ShutdownPrepared interface defines the following methods:

prepareShutdown

Receives notifications to shut down the consumer endpoint in one or two phases, as follows:

  1. Graceful shutdown — where the forced argument has the value false. Attempt to clean up resources gracefully. For example, by stopping threads gracefully.
  2. Forced shutdown — where the forced argument has the value true. This means that the shutdown has timed out, so you must clean up resources more aggressively. This is the last chance to clean up resources before the process exits.

ShutdownAware interface

Consumer classes can optionally implement the org.apache.camel.spi.ShutdownAware interface, which interacts with the graceful shutdown mechanism, enabling a consumer to ask for extra time to shut down. This is typically needed for components such as SEDA, which can have pending exchanges stored in an internal queue. Normally, you would want to process all of the exchanges in the queue before shutting down the SEDA consumer.

Example 41.3, “ShutdownAware Interface” shows the definition of the ShutdownAware interface.

Example 41.3. ShutdownAware Interface

// Java
package org.apache.camel.spi;

import org.apache.camel.ShutdownRunningTask;

public interface ShutdownAware extends ShutdownPrepared {

    boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);

    int getPendingExchangesSize();
}

The ShutdownAware interface defines the following methods:

deferShutdown

Return true from this method, if you want to delay shutdown of the consumer. The shutdownRunningTask argument is an enum which can take either of the following values:

  • ShutdownRunningTask.CompleteCurrentTaskOnly — finish processing the exchanges that are currently being processed by the consumer’s thread pool, but do not attempt to process any more exchanges than that.
  • ShutdownRunningTask.CompleteAllTasks — process all of the pending exchanges. For example, in the case of the SEDA component, the consumer would process all of the exchanges from its incoming queue.
getPendingExchangesSize
Indicates how many exchanges remain to be processed by the consumer. A zero value indicates that processing is finished and the consumer can be shut down.

For an example of how to define the ShutdownAware methods, see Example 41.7, “Custom Threading Implementation”.

41.2. Implementing the Consumer Interface

Alternative ways of implementing a consumer

You can implement a consumer in one of the following ways:

Event-driven consumer implementation

In an event-driven consumer, processing is driven explicitly by external events. The events are received through an event-listener interface, where the listener interface is specific to the particular event source.

Example 41.4, “JMXConsumer Implementation” shows the implementation of the JMXConsumer class, which is taken from the Apache Camel JMX component implementation. The JMXConsumer class is an example of an event-driven consumer, which is implemented by inheriting from the org.apache.camel.impl.DefaultConsumer class. In the case of the JMXConsumer example, events are represented by calls on the NotificationListener.handleNotification() method, which is a standard way of receiving JMX events. In order to receive these JMX events, it is necessary to implement the NotificationListener interface and override the handleNotification() method, as shown in Example 41.4, “JMXConsumer Implementation”.

Example 41.4. JMXConsumer Implementation

package org.apache.camel.component.jmx;

import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1

    JMXEndpoint jmxEndpoint;

    public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.jmxEndpoint = endpoint;
    }

    public void handleNotification(Notification notification, Object handback) { 3
        try {
            getProcessor().process(jmxEndpoint.createExchange(notification)); 4
        } catch (Throwable e) {
            handleException(e); 5
        }
    }
}
1
The JMXConsumer pattern follows the usual pattern for event-driven consumers by extending the DefaultConsumer class. Additionally, because this consumer is designed to receive events from JMX (which are represented by JMX notifications), it is necessary to implement the NotificationListener interface.
2
You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, and a reference to the next processor in the chain, processor, as arguments.
3
The handleNotification() method (which is defined in NotificationListener) is automatically invoked by JMX whenever a JMX notification arrives. The body of this method should contain the code that performs the consumer’s event processing. Because the handleNotification() call originates from the JMX layer, the consumer’s threading model is implicitly controlled by the JMX layer, not by the JMXConsumer class.
4
This line of code combines two steps. First, the JMX notification object is converted into an exchange object, which is the generic representation of an event in Apache Camel. Then the newly created exchange object is passed to the next processor in the route (invoked synchronously).
5
The handleException() method is implemented by the DefaultConsumer base class. By default, it handles exceptions using the org.apache.camel.impl.LoggingExceptionHandler class.
Note

The handleNotification() method is specific to the JMX example. When implementing your own event-driven consumer, you must identify an analogous event listener method to implement in your custom consumer.

Scheduled poll consumer implementation

In a scheduled poll consumer, polling events are automatically generated by a timer class, java.util.concurrent.ScheduledExecutorService. To receive the generated polling events, you must implement the ScheduledPollConsumer.poll() method (see Section 38.1.3, “Consumer Patterns and Threading”).

Example 41.5, “ScheduledPollConsumer Implementation” shows how to implement a consumer that follows the scheduled poll pattern, which is implemented by extending the ScheduledPollConsumer class.

Example 41.5. ScheduledPollConsumer Implementation

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;

import org.apache.camel.impl.ScheduledPollConsumer;

public class pass:quotes[CustomConsumer] extends ScheduledPollConsumer { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    protected void poll() throws Exception { 3
        Exchange exchange = /* Receive exchange object ... */;

        // Example of a synchronous processor.
        getProcessor().process(exchange); 4
    }

    @Override
    protected void doStart() throws Exception { 5
        // Pre-Start:
        // Place code here to execute just before start of processing.
        super.doStart();
        // Post-Start:
        // Place code here to execute just after start of processing.
    }

    @Override
    protected void doStop() throws Exception { 6
        // Pre-Stop:
        // Place code here to execute just before processing stops.
        super.doStop();
        // Post-Stop:
        // Place code here to execute just after processing stops.
    }
}
1
Implement a scheduled poll consumer class, CustomConsumer, by extending the org.apache.camel.impl.ScheduledPollConsumer class.
2
You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, and a reference to the next processor in the chain, processor, as arguments.
3
Override the poll() method to receive the scheduled polling events. This is where you should put the code that retrieves and processes incoming events (represented by exchange objects).
4
In this example, the event is processed synchronously. If you want to process events asynchronously, you should use a reference to an asynchronous processor instead, by calling getAsyncProcessor(). For details of how to process events asynchronously, see Section 38.1.4, “Asynchronous Processing”.
5
(Optional) If you want some lines of code to execute as the consumer is starting up, override the doStart() method as shown.
6
(Optional) If you want some lines of code to execute as the consumer is stopping, override the doStop() method as shown.

Polling consumer implementation

Example 41.6, “PollingConsumerSupport Implementation” outlines how to implement a consumer that follows the polling pattern, which is implemented by extending the PollingConsumerSupport class.

Example 41.6. PollingConsumerSupport Implementation

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.PollingConsumerSupport;

public class pass:quotes[CustomConsumer] extends PollingConsumerSupport { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint) { 2
        super(endpoint);
        this.endpoint = endpoint;
    }

    public Exchange receiveNoWait() { 3
        Exchange exchange = /* Obtain an exchange object. */;
        // Further processing ...
        return exchange;
    }

    public Exchange receive() { 4
        // Blocking poll ...
    }

    public Exchange receive(long timeout) { 5
        // Poll with timeout ...
    }

    protected void doStart() throws Exception { 6
        // Code to execute whilst starting up.
    }

    protected void doStop() throws Exception {
        // Code to execute whilst shutting down.
    }
}
1
Implement your polling consumer class, CustomConsumer, by extending the org.apache.camel.impl.PollingConsumerSupport class.
2
You must implement at least one constructor that takes a reference to the parent endpoint, endpoint, as an argument. A polling consumer does not need a reference to a processor instance.
3
The receiveNoWait() method should implement a non-blocking algorithm for retrieving an event (exchange object). If no event is available, it should return null.
4
The receive() method should implement a blocking algorithm for retrieving an event. This method can block indefinitely, if events remain unavailable.
5
The receive(long timeout) method implements an algorithm that can block for as long as the specified timeout (typically specified in units of milliseconds).
6
If you want to insert code that executes while a consumer is starting up or shutting down, implement the doStart() method and the doStop() method, respectively.

Custom threading implementation

If the standard consumer patterns are not suitable for your consumer implementation, you can implement the Consumer interface directly and write the threading code yourself. When writing the threading code, however, it is important that you comply with the standard Apache Camel threading model, as described in Section 2.8, “Threading Model”.

For example, the SEDA component from camel-core implements its own consumer threading, which is consistent with the Apache Camel threading model. Example 41.7, “Custom Threading Implementation” shows an outline of how the SedaConsumer class implements its threading.

Example 41.7. Custom Threading Implementation

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Consumer for the SEDA component.
 *
 * @version $Revision: 922485 $
 */
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { 1
    private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);

    private SedaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;
    ...
    public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = processor;
    }
    ...

    public void run() { 2
        BlockingQueue<Exchange> queue = endpoint.getQueue();
        // Poll the queue and process exchanges
        ...
    }

    ...
    protected void doStart() throws Exception { 3
        int poolSize = endpoint.getConcurrentConsumers();
        executor = endpoint.getCamelContext().getExecutorServiceStrategy()
            .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 4
        for (int i = 0; i < poolSize; i++) { 5
            executor.execute(this);
        }
        endpoint.onStarted(this);
    }

    protected void doStop() throws Exception { 6
        endpoint.onStopped(this);
        // must shutdown executor on stop to avoid overhead of having them running
        endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); 7

        if (multicast != null) {
            ServiceHelper.stopServices(multicast);
        }
    }
    ...
    //----------
    // Implementation of ShutdownAware interface

    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        // deny stopping on shutdown as we want seda consumers to run in case some other queues
        // depend on this consumer to run, so it can complete its exchanges
        return true;
    }

    public int getPendingExchangesSize() {
        // number of pending messages on the queue
        return endpoint.getQueue().size();
    }

}
1
The SedaConsumer class is implemented by extending the org.apache.camel.impl.ServiceSupport class and implementing the Consumer, Runnable, and ShutdownAware interfaces.
2
Implement the Runnable.run() method to define what the consumer does while it is running in a thread. In this case, the consumer runs in a loop, polling the queue for new exchanges and then processing the exchanges in the latter part of the queue.
3
The doStart() method is inherited from ServiceSupport. You override this method in order to define what the consumer does when it starts up.
4
Instead of creating threads directly, you should create a thread pool using the ExecutorServiceStrategy object that is registered with the CamelContext. This is important, because it enables Apache Camel to implement centralized management of threads and support such features as graceful shutdown. For details, see Section 2.8, “Threading Model”.
5
Kick off the threads by calling the ExecutorService.execute() method poolSize times.
6
The doStop() method is inherited from ServiceSupport. You override this method in order to define what the consumer does when it shuts down.
7
Shut down the thread pool, which is represented by the executor instance.