Red Hat Training

A Red Hat training course is available for Red Hat Fuse

49.2. Implementing the Endpoint Interface

Alternative ways of implementing an endpoint

The following alternative endpoint implementation patterns are supported:

Event-driven endpoint implementation

If your custom endpoint conforms to the event-driven pattern (see Section 47.1.3, “Consumer Patterns and Threading”), it is implemented by extending the abstract class, org.apache.camel.impl.DefaultEndpoint, as shown in Example 49.2, “Implementing DefaultEndpoint”.

Example 49.2. Implementing DefaultEndpoint

import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;

public class CustomEndpoint extends DefaultEndpoint { 1

    public CustomEndpoint(String endpointUri, Component component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        return new CustomProducer(this);
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        return new CustomConsumer(this, processor);
    }

    public boolean isSingleton() { 
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 5
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
Implement an event-driven custom endpoint, CustomEndpoint, by extending the DefaultEndpoint class.
2
You must have at least one constructor that takes the endpoint URI, endpointUri, and the parent component reference, component, as arguments.
3
Implement the createProducer() factory method to create producer endpoints.
4
Implement the createConsumer() factory method to create event-driven consumer instances.
Important
Do not override the createPollingConsumer() method.
5
In general, it is not necessary to override the createExchange() methods. The implementations inherited from DefaultEndpoint create a DefaultExchange object by default, which can be used in any Apache Camel component. If you need to initialize some exchange properties in the DefaultExchange object, however, it is appropriate to override the createExchange() methods here in order to add the exchange property settings.
The DefaultEndpoint class provides default implementations of the following methods, which you might find useful when writing your custom endpoint code:
  • getEndpointUri()—Returns the endpoint URI.
  • getCamelContext()—Returns a reference to the CamelContext.
  • getComponent()—Returns a reference to the parent component.
  • createPollingConsumer()—Creates a polling consumer. The created polling consumer's functionality is based on the event-driven consumer. If you override the event-driven consumer method, createConsumer(), you get a polling consumer implementation for free.
  • createExchange(Exchange e)—Converts the given exchange object, e, to the type required for this endpoint. This method creates a new endpoint using the overridden createExchange() endpoints. This ensures that the method also works for custom exchange types.

Scheduled poll endpoint implementation

If your custom endpoint conforms to the scheduled poll pattern (see Section 47.1.3, “Consumer Patterns and Threading”) it is implemented by inheriting from the abstract class, org.apache.camel.impl.ScheduledPollEndpoint, as shown in Example 49.3, “ScheduledPollEndpoint Implementation”.

Example 49.3. ScheduledPollEndpoint Implementation

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.ScheduledPollEndpoint;

public class CustomEndpoint extends ScheduledPollEndpoint { 1

    protected CustomEndpoint(String endpointUri, CustomComponent component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        Producer result = new CustomProducer(this);
        return result;
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        Consumer result = new CustomConsumer(this, processor);
        configureConsumer(result); 5
        return result;
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 6
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
Implement a scheduled poll custom endpoint, CustomEndpoint, by extending the ScheduledPollEndpoint class.
2
You must to have at least one constructor that takes the endpoint URI, endpointUri, and the parent component reference, component, as arguments.
3
Implement the createProducer() factory method to create a producer endpoint.
4
Implement the createConsumer() factory method to create a scheduled poll consumer instance.
Important
Do not override the createPollingConsumer() method.
5
The configureConsumer() method, defined in the ScheduledPollEndpoint base class, is responsible for injecting consumer query options into the consumer. See the section called “Consumer parameter injection”.
6
In general, it is not necessary to override the createExchange() methods. The implementations inherited from DefaultEndpoint create a DefaultExchange object by default, which can be used in any Apache Camel component. If you need to initialize some exchange properties in the DefaultExchange object, however, it is appropriate to override the createExchange() methods here in order to add the exchange property settings.

Polling endpoint implementation

If your custom endpoint conforms to the polling consumer pattern (see Section 47.1.3, “Consumer Patterns and Threading”), it is implemented by inheriting from the abstract class, org.apache.camel.impl.DefaultPollingEndpoint, as shown in Example 49.4, “DefaultPollingEndpoint Implementation”.

Example 49.4. DefaultPollingEndpoint Implementation

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultPollingEndpoint;

public class CustomEndpoint extends DefaultPollingEndpoint {
    ...
    public PollingConsumer createPollingConsumer() throws Exception {
        PollingConsumer result = new CustomConsumer(this);
        configureConsumer(result);
        return result;
    }

    // Do NOT implement createConsumer(). It is already implemented in DefaultPollingEndpoint.
    ...
}
Because this CustomEndpoint class is a polling endpoint, you must implement the createPollingConsumer() method instead of the createConsumer() method. The consumer instance returned from createPollingConsumer() must inherit from the PollingConsumer interface. For details of how to implement a polling consumer, see the section called “Polling consumer implementation”.
Apart from the implementation of the createPollingConsumer() method, the steps for implementing a DefaultPollingEndpoint are similar to the steps for implementing a ScheduledPollEndpoint. See Example 49.3, “ScheduledPollEndpoint Implementation” for details.

Implementing the BrowsableEndpoint interface

If you want to expose the list of exchange instances that are pending in the current endpoint, you can implement the org.apache.camel.spi.BrowsableEndpoint interface, as shown in Example 49.5, “BrowsableEndpoint Interface”. It makes sense to implement this interface if the endpoint performs some sort of buffering of incoming events. For example, the Apache Camel SEDA endpoint implements the BrowsableEndpoint interface—see Example 49.6, “SedaEndpoint Implementation”.

Example 49.5. BrowsableEndpoint Interface

package org.apache.camel.spi;

import java.util.List;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;

public interface BrowsableEndpoint extends Endpoint {
    List<Exchange> getExchanges();
}

Example

Example 49.6, “SedaEndpoint Implementation” shows a sample implementation of SedaEndpoint. The SEDA endpoint is an example of an event-driven endpoint. Incoming events are stored in a FIFO queue (an instance of java.util.concurrent.BlockingQueue) and a SEDA consumer starts up a thread to read and process the events. The events themselves are represented by org.apache.camel.Exchange objects.

Example 49.6. SedaEndpoint Implementation

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;

public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 1
    private BlockingQueue<Exchange> queue;

    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 2
        super(endpointUri, component);
        this.queue = queue;
    }

    public SedaEndpoint(String uri, SedaComponent component, Map parameters) { 3
        this(uri, component, component.createQueue(uri, parameters));
    }

    public Producer createProducer() throws Exception { 4
        return new CollectionProducer(this, getQueue());
    }

    public Consumer createConsumer(Processor processor) throws Exception { 5
        return new SedaConsumer(this, processor);
    }

    public BlockingQueue<Exchange> getQueue() { 6
        return queue;
    }

    public boolean isSingleton() { 7
        return true;
    }

    public List<Exchange> getExchanges() { 8
        return new ArrayList<Exchange>(getQueue());
    }
}
1
The SedaEndpoint class follows the pattern for implementing an event-driven endpoint by extending the DefaultEndpoint class. The SedaEndpoint class also implements the BrowsableEndpoint interface, which provides access to the list of exchange objects in the queue.
2
Following the usual pattern for an event-driven consumer, SedaEndpoint defines a constructor that takes an endpoint argument, endpointUri, and a component reference argument, component.
3
Another constructor is provided, which delegates queue creation to the parent component instance.
4
The createProducer() factory method creates an instance of CollectionProducer, which is a producer implementation that adds events to the queue.
5
The createConsumer() factory method creates an instance of SedaConsumer, which is responsible for pulling events off the queue and processing them.
6
The getQueue() method returns a reference to the queue.
7
The isSingleton() method returns true, indicating that a single endpoint instance should be created for each unique URI string.
8
The getExchanges() method implements the corresponding abstract method from BrowsableEndpoint.