Chapter 76. JCache

JCache Component

Available as of Camel 2.17
The jcache component enables you to perform caching operations using JCache (JSR-107) as the Cache Implementation. The cache itself is created on demand or if a cache of that name already exists then it is simply utilized with its original settings.
This component supports producer and event based consumer endpoints.
The Cache consumer is an event based consumer and can be used to listen and respond to specific cache activities. If you need to perform selections from a pre-existing cache, use the processors defined for the cache component.
Maven users will need to add the following dependency to their pom.xml for this component:
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-jcache</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

cache://cacheName[?options]
You can append query options to the URI in the following format, ?option=value&option=#beanRef&...

Options

Name
Default Value
Description
cachingProvider
null
The fully qualified class name of the javax.cache.spi.CachingProvider. Mandatory in an OSGI environment.
cacheConfiguration
null
A reference to a javax.cache.configuration.Configuration instance
cacheConfigurationProperties
null
A reference to a java.util.Properties for the javax.cache.spi.CachingProvider to create the javax.cache.CacheManager
configurationUri
null
An implementation specific URI for the javax.cache.CacheManager
cacheLoaderFactory
null
A reference to a javax.cache.configuration.Factory for javax.cache.integration.CacheLoader
cacheWriterFactory
null
A reference to a javax.cache.configuration.Factory for javax.cache.integration.CacheWriter
expiryPolicyFactory
null
A reference to a javax.cache.configuration.Factory for javax.cache.expiry.ExpiryPolicy
readThrough
false
A flag indicating if "read-through" mode is required
writeThrough
false
A flag indicating if "write-through" mode is required
storeByValue true A flag indicating if the cache will be store-by-value or store-by-reference
statisticsEnabled
fasle
A flag indicating if statistics gathering is enabled
managementEnabled
false
A flag indicating if management is enabled
filteredEvents
null
A comma separated list of event types to filter. Overridden by eventFilters when they are specified together.
eventFilters
null
A comma separated list of javax.cache.event.CacheEntryEventFilter references. Overrides filteredEvents when they are specified together.
oldValueRequired false A flag indicating if the old value is required for events, supported values are CREATED, UPDATED, REMOVED, EXPIRED
synchronous false A flag indicating if the event listener should block the thread causing the event
action null The default action to apply, value in the header has the priority
createCacheIfNotExists true Configure if the cache identified by cacheName need to be created if it does not exists

Header variables

Name
Type
Description
CamelJCacheAction
java.lang.String
The action to perform, supported values are PUT, PUTALL, PUTIFABSENT, GET, GETALL, GETANDREMOVE, GETANDREPLACE, GETANDPUT, REPLACE, REMOVE, REMOVEALL, INVOKE, CLEAR
CamelJCacheResult
java.lang.Object
The result of an action, i.e. Boolean for PUT, REMOVE, REPLACE
CamelJCacheEventType
java.lang.String
The type of event javax.cache.event.EventType
CamelJCacheKey
java.lang.Object
A key to apply an action
CamelJCacheKeys java.util.Set<java-lang.Object> A set of keys to apply an action, used for GETALL, REMOVEALL, INVOKE
CamelJCacheOldValue java.lang.Object On consumer side, the header value contains the old value associated to a key. On producer side, the header must contains the expected old value to use CAS like operation
CamelJCacheEntryProcessor javax.cache.processor.EntryProcessor The entry processor to use for INVOKE action
CamelJCacheEntryArgs java.util.collection<java.lang.Object> Additional arguments to pass to the javax.cache.processor.EntryProcessor

JCache based idempotent repository example:

JCacheIdempotentRepository idempotentRepo = new JCacheIdempotentRepository();
idempotentRepo.setCacheName("idempotent-cache")
 
from("direct:in")
    .idempotentConsumer(header("messageId"), idempotentRepo)
    .to("mock:out");

JCache based aggregation repository example:

package org.apache.camel.component.jcache.processor.aggregate;

import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.junit.Test;

public class JCacheAggregationRepositoryRoutesTest extends JCacheAggregationRepositoryTestSupport {
    private static final String MOCK_GOTCHA = "mock:gotcha";
    private static final String DIRECT_ONE = "direct:one";
    private static final String DIRECT_TWO = "direct:two";
    @EndpointInject(uri = MOCK_GOTCHA)
    private MockEndpoint mock;
    @Produce(uri = DIRECT_ONE)
    private ProducerTemplate produceOne;
    @Produce(uri = DIRECT_TWO)
    private ProducerTemplate produceTwo;
    @Test
    public void checkAggregationFromTwoRoutes() throws Exception {
        final JCacheAggregationRepository repoOne = createRepository(false);
        final JCacheAggregationRepository repoTwo = createRepository(false);
        final int completionSize = 4;
        final String correlator = "CORRELATOR";
        RouteBuilder rbOne = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(DIRECT_ONE).routeId("AggregatingRouteOne")
                    .aggregate(header(correlator))
                    .aggregationRepository(repoOne)
                    .aggregationStrategy(new MyAggregationStrategy())
                    .completionSize(completionSize)
                    .to(MOCK_GOTCHA);
            }
        };
        RouteBuilder rbTwo = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(DIRECT_TWO).routeId("AggregatingRouteTwo")
                    .aggregate(header(correlator))
                    .aggregationRepository(repoTwo)
                    .aggregationStrategy(new MyAggregationStrategy())
                    .completionSize(completionSize)
                    .to(MOCK_GOTCHA);
            }
        };
        context().addRoutes(rbOne);
        context().addRoutes(rbTwo);
        context().start();
        mock.expectedMessageCount(1);
        mock.expectedBodiesReceived(1 + 2 + 3 + 4);
        produceOne.sendBodyAndHeader(1, correlator, correlator);
        produceTwo.sendBodyAndHeader(2, correlator, correlator);
        produceOne.sendBodyAndHeader(3, correlator, correlator);
        produceOne.sendBodyAndHeader(4, correlator, correlator);
        mock.assertIsSatisfied();
    }
    private class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            } else {
                Integer n = newExchange.getIn().getBody(Integer.class);
                Integer o = oldExchange.getIn().getBody(Integer.class);
                Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
                oldExchange.getIn().setBody(v, Integer.class);
                return oldExchange;
            }
        }
    }
    protected JCacheAggregationRepository createRepository(boolean optimistic) throws Exception {
        JCacheAggregationRepository repository = new JCacheAggregationRepository();
        repository.setConfiguration(new JCacheConfiguration());
        repository.setCacheName("aggregation-repository");
        repository.setOptimistic(optimistic);
        return repository;
    }
}