Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 55. HawtDB

HawtDB

Available as of Apache Camel 2.3
HawtDB is a very lightweight and embeddable key value database. It allows together with Apache Camel to provide persistent support for various Apache Camel features such as section "Aggregator" in "Apache Camel Development Guide".
Deprecated
The HawtDB project is being deprecated and replaced by leveldb as the lightweight and embedable key value database. To make using leveldb easy there is a leveldbjni project for that. The Apache ActiveMQ project is planning on using leveldb as their primary file based message store in the future, to replace kahadb.
There os a camel-leveldb component we recommend to use instead of this.
Current features it provides:
  • HawtDBAggregationRepository

Using HawtDBAggregationRepository

HawtDBAggregationRepository is an AggregationRepository which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository.
It has the following options:
Option Type Description
repositoryName String A mandatory repository name. Allows you to use a shared HawtDBFile for multiple repositories.
persistentFileName String Filename for the persistent storage. If no file exists on startup a new file is created.
bufferSize int The size of the memory segment buffer which is mapped to the file store. By default its 8mb. The value is in bytes.
sync boolean Whether or not the HawtDBFile should sync on write or not. Default is true. By sync on write ensures that its always waiting for all writes to be spooled to disk and thus will not loose updates. If you disable this option, then HawtDB will auto sync when it has batched up a number of writes.
pageSize short The size of memory pages. By default its 512 bytes. The value is in bytes.
hawtDBFile HawtDBFile Use an existing configured org.apache.camel.component.hawtdb.HawtDBFile instance.
returnOldExchange boolean Whether the get operation should return the old existing Exchange if any existed. By default this option is false to optimize as we do not need the old exchange when aggregating.
useRecovery boolean Whether or not recovery is enabled. This option is by default true. When enabled the Apache Camel section "Aggregator" in "Apache Camel Development Guide" automatic recover failed aggregated exchange and have them resubmitted.
recoveryInterval long If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.
maximumRedeliveries int Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the deadLetterUri option must also be provided.
deadLetterUri String An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is used then the maximumRedeliveries option must also be provided.
optimisticLocking false Camel 2.12: To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same HawtDB based aggregation repository.
The repositoryName option must be provided. Then either the persistentFileName or the hawtDBFile must be provided.

What is preserved when persisting

HawtDBAggregationRepository will only preserve any Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only persists the Message body and the Message headers. The Exchange properties are not persisted.

Recovery

The HawtDBAggregationRepository will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the checkInterval option to set how often this task runs. The recovery works as transactional which ensures that Apache Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again.
The following headers is set when an Exchange is being recovered/redelivered:
Header Type Description
Exchange.REDELIVERED Boolean Is set to true to indicate the Exchange is being redelivered.
Exchange.REDELIVERY_COUNTER Integer The redelivery attempt, starting from 1.
Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm method is invoked on the AggregationRepository. This means if the same Exchange fails again it will be kept retried until it success.
You can use option maximumRedeliveries to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri option so Apache Camel knows where to send the Exchange when the maximumRedeliveries was hit.
You can see some examples in the unit tests of camel-hawtdb, for example this test.

Using HawtDBAggregationRepository in Java DSL

In this example we want to persist aggregated messages in the target/data/hawtdb.dat file.
public void configure() throws Exception {
    // create the hawtdb repo
    HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");

    // here is the Camel route where we aggregate
    from("direct:start")
        .aggregate(header("id"), new MyAggregationStrategy())
            // use our created hawtdb repo as aggregation repository
            .completionSize(5).aggregationRepository(repo)
            .to("mock:aggregated");
}

Using HawtDBAggregationRepository in Spring XML

The same example but using Spring XML instead:
<!-- a persistent aggregation repository using camel-hawtdb -->
<bean id="repo" class="org.apache.camel.component.hawtdb.HawtDBAggregationRepository">
    <!-- store the repo in the hawtdb.dat file -->
    <property name="persistentFileName" value="target/data/hawtdb.dat"/>
    <!-- and use repo2 as the repository name -->
    <property name="repositoryName" value="repo2"/>
</bean>

<!-- aggregate the messages using this strategy -->
<bean id="myAggregatorStrategy" class="org.apache.camel.component.hawtdb.HawtDBSpringAggregateTest$MyAggregationStrategy"/>

<!-- this is the camel routes -->
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">

    <route>
        <from uri="direct:start"/>
        <!-- aggregate using our strategy and hawtdb repo, and complete when we have 5 messages aggregated -->
        <aggregate strategyRef="myAggregatorStrategy" aggregationRepositoryRef="repo" completionSize="5">
            <!-- correlate by header with the key id -->
            <correlationExpression><header>id</header></correlationExpression>
            <!-- send aggregated messages to the mock endpoint -->
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>

</camelContext>

Dependencies

To use HawtDB in your Apache Camel routes you need to add the a dependency on camel-hawtdb.
If you use maven you could just add the following to your pom.xml, substituting the version number for the latest & greatest release (see the download page for the latest versions).
<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-hawtdb</artifactId>
  <version>2.3.0</version>
</dependency>
See Also: