Chapter 121. Guava EventBus Component

Available as of Camel version 2.10

The Google Guava EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). The guava-eventbus: component provides integration bridge between Camel and Google Guava EventBus infrastructure. With the latter component, messages exchanged with the Guava EventBus can be transparently forwarded to the Camel routes. EventBus component allows also to route body of Camel exchanges to the Guava EventBus.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-guava-eventbus</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

121.1. URI format

guava-eventbus:busName[?options]

Where busName represents the name of the com.google.common.eventbus.EventBus instance located in the Camel registry.

121.2. Options

The Guava EventBus component supports 3 options which are listed below.

NameDescriptionDefaultType

eventBus (common)

To use the given Guava EventBus instance

 

EventBus

listenerInterface (common)

The interface with method(s) marked with the Subscribe annotation. Dynamic proxy will be created over the interface so it could be registered as the EventBus listener. Particularly useful when creating multi-event listeners and for handling DeadEvent properly. This option cannot be used together with eventClass option.

 

Class<?>

resolveProperty Placeholders (advanced)

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

boolean

The Guava EventBus endpoint is configured using URI syntax:

guava-eventbus:eventBusRef

with the following path and query parameters:

121.2.1. Path Parameters (1 parameters):

NameDescriptionDefaultType

eventBusRef

To lookup the Guava EventBus from the registry with the given name

 

String

121.2.2. Query Parameters (6 parameters):

NameDescriptionDefaultType

eventClass (common)

If used on the consumer side of the route, will filter events received from the EventBus to the instances of the class and superclasses of eventClass. Null value of this option is equal to setting it to the java.lang.Object i.e. the consumer will capture all messages incoming to the event bus. This option cannot be used together with listenerInterface option.

 

Class<?>

listenerInterface (common)

The interface with method(s) marked with the Subscribe annotation. Dynamic proxy will be created over the interface so it could be registered as the EventBus listener. Particularly useful when creating multi-event listeners and for handling DeadEvent properly. This option cannot be used together with eventClass option.

 

Class<?>

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

exceptionHandler (consumer)

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

 

ExceptionHandler

exchangePattern (consumer)

Sets the exchange pattern when the consumer creates an exchange.

 

ExchangePattern

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

121.3. Usage

Using guava-eventbus component on the consumer side of the route will capture messages sent to the Guava EventBus and forward them to the Camel route. Guava EventBus consumer processes incoming messages asynchronously.

SimpleRegistry registry = new SimpleRegistry();
EventBus eventBus = new EventBus();
registry.put("busName", eventBus);
CamelContext camel = new DefaultCamelContext(registry);

from("guava-eventbus:busName").to("seda:queue");

eventBus.post("Send me to the SEDA queue.");

Using guava-eventbus component on the producer side of the route will forward body of the Camel exchanges to the Guava EventBus instance.

SimpleRegistry registry = new SimpleRegistry();
EventBus eventBus = new EventBus();
registry.put("busName", eventBus);
CamelContext camel = new DefaultCamelContext(registry);

from("direct:start").to("guava-eventbus:busName");

ProducerTemplate producerTemplate = camel.createProducerTemplate();
producer.sendBody("direct:start", "Send me to the Guava EventBus.");

eventBus.register(new Object(){
  @Subscribe
  public void messageHander(String message) {
    System.out.println("Message received from the Camel: " + message);
  }
});

121.4. DeadEvent considerations

Keep in mind that due to the limitations caused by the design of the Guava EventBus, you cannot specify event class to be received by the listener without creating class annotated with @Subscribe method. This limitation implies that endpoint with eventClass option specified actually listens to all possible events (java.lang.Object) and filter appropriate messages programmatically at runtime. The snipped below demonstrates an appropriate excerpt from the Camel code base.

@Subscribe
public void eventReceived(Object event) {
  if (eventClass == null || eventClass.isAssignableFrom(event.getClass())) {
    doEventReceived(event);
...

This drawback of this approach is that EventBus instance used by Camel will never generate com.google.common.eventbus.DeadEvent notifications. If you want Camel to listen only to the precisely specified event (and therefore enable DeadEvent support), use listenerInterface endpoint option. Camel will create dynamic proxy over the interface you specify with the latter option and listen only to messages specified by the interface handler methods. The example of the listener interface with single method handling only SpecificEvent instances is demonstrated below.

package com.example;

public interface CustomListener {

  @Subscribe
  void eventReceived(SpecificEvent event);

}

The listener presented above could be used in the endpoint definition as follows.

from("guava-eventbus:busName?listenerInterface=com.example.CustomListener").to("seda:queue");

121.5. Consuming multiple type of events

In order to define multiple type of events to be consumed by Guava EventBus consumer use listenerInterface endpoint option, as listener interface could provide multiple methods marked with the @Subscribe annotation.

package com.example;

public interface MultipleEventsListener {

  @Subscribe
  void someEventReceived(SomeEvent event);

  @Subscribe
  void anotherEventReceived(AnotherEvent event);

}

The listener presented above could be used in the endpoint definition as follows.

from("guava-eventbus:busName?listenerInterface=com.example.MultipleEventsListener").to("seda:queue");

121.6. HawtDB

Available as of Camel 2.3

HawtDB is a very lightweight and embedable key value database. It allows together with Camel to provide persistent support for various Camel features such as Aggregator.

Deprecated

The HawtDB project is being deprecated and replaced by leveldb as the lightweight and embedable key value database. To make using leveldb easy there is a leveldbjni project for that. The Apache ActiveMQ project is planning on using leveldb as their primary file based message store in the future, to replace kahadb.

There is a camel-leveldb component we recommend to use instead of this.

Issue with HawtDB 1.4 or older

There is a bug in HawtDB 1.4 or older which means the filestore will not free unused space. That means the file keeps growing. This has been fixed in HawtDB 1.5 which is shipped with Camel 2.5 onwards.

Current features it provides:

  • HawtDBAggregationRepository

121.6.1. Using HawtDBAggregationRepository

HawtDBAggregationRepository is an AggregationRepository which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository.

It has the following options:

OptionTypeDescription

repositoryName

String

A mandatory repository name. Allows you to use a shared HawtDBFile for multiple repositories.

persistentFileName

String

Filename for the persistent storage. If no file exists on startup a new file is created.

bufferSize

int

The size of the memory segment buffer which is mapped to the file store. By default its 8mb. The value is in bytes.

sync

boolean

Whether or not the HawtDBFile should sync on write or not. Default is true. By sync on write ensures that its always waiting for all writes to be spooled to disk and thus will not loose updates. If you disable this option, then HawtDB will auto sync when it has batched up a number of writes.

pageSize

short

The size of memory pages. By default its 512 bytes. The value is in bytes.

hawtDBFile

HawtDBFile

Use an existing configured org.apache.camel.component.hawtdb.HawtDBFile instance.

returnOldExchange

boolean

Whether the get operation should return the old existing Exchange if any existed. By default this option is false to optimize as we do not need the old exchange when aggregating.

useRecovery

boolean

Whether or not recovery is enabled. This option is by default true. When enabled the Camel Aggregator automatic recover failed aggregated exchange and have them resubmitted.

recoveryInterval

long

If recovery is enabled then a background task is run every x’th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.

maximumRedeliveries

int

Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the deadLetterUri option must also be provided.

deadLetterUri

String

An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is used then the maximumRedeliveries option must also be provided.

optimisticLocking

false

Camel 2.12: To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same HawtDB based aggregation repository.

The repositoryName option must be provided. Then either the persistentFileName or the hawtDBFile must be provided.

121.6.2. What is preserved when persisting

HawtDBAggregationRepository will only preserve any Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only persists the Message body and the Message headers. The Exchange properties are not persisted.

121.6.3. Recovery

The HawtDBAggregationRepository will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the checkInterval option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again.

The following headers is set when an Exchange is being recovered/redelivered:

HeaderTypeDescription

Exchange.REDELIVERED

Boolean

Is set to true to indicate the Exchange is being redelivered.

Exchange.REDELIVERY_COUNTER

Integer

The redelivery attempt, starting from 1.

Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm method is invoked on the AggregationRepository. This means if the same Exchange fails again it will be kept retried until it success.

You can use option maximumRedeliveries to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri option so Camel knows where to send the Exchange when the maximumRedeliveries was hit.

You can see some examples in the unit tests of camel-hawtdb, for example this test.

121.6.3.1. Using HawtDBAggregationRepository in Java DSL

In this example we want to persist aggregated messages in the target/data/hawtdb.dat file.

121.6.3.2. Using HawtDBAggregationRepository in Spring XML

The same example but using Spring XML instead:

121.6.4. Dependencies

To use HawtDB in your camel routes you need to add the a dependency on camel-hawtdb.

If you use maven you could just add the following to your pom.xml, substituting the version number for the latest & greatest release (see the download page for the latest versions).

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-hawtdb</artifactId>
  <version>2.3.0</version>
</dependency>

121.6.5. See Also

  • Configuring Camel
  • Component
  • Endpoint
  • Getting Started
  • Aggregator
  • Components