Chapter 76. JPA

JPA Component

The jpa component enables you to store and retrieve Java objects from persistent storage using EJB 3's Java Persistence Architecture (JPA), which is a standard interface layer that wraps Object/Relational Mapping (ORM) products such as OpenJPA, Hibernate, TopLink, and so on.

Sending to the endpoint

You can store a Java entity bean in a database by sending it to a JPA producer endpoint. The body of the In message is assumed to be an entity bean (that is, a POJO with an @Entity annotation on it) or a collection or an array of entity beans.
If the body does not contain one of the preceding types, put a Message TranslatorMessage Translator in front of the endpoint to perform the necessary conversion first.

Consuming from the endpoint

Consuming messages from a JPA consumer endpoint removes (or updates) entity beans in the database. This allows you to use a database table as a logical queue: consumers take messages from the queue and then delete/update them to logically remove them from the queue.
If you do not wish to delete the entity bean when it has been processed (and when routing is done), you can specify consumeDelete=false on the URI. This will result in the entity being processed each poll.
If you would rather perform some update on the entity to mark it as processed (such as to exclude it from a future query) then you can annotate a method with @Consumed which will be invoked on your entity bean when the entity bean when it has been processed (and when routing is done).
From Camel 2.13 onwards you can use @PreConsumed which will be invoked on your entity bean before it has been processed (before routing).

URI format

jpa:entityClassName[?options]
For sending to the endpoint, the entityClassName is optional. If specified, it helps the Type Converter to ensure the body is of the correct type.
For consuming, the entityClassName is mandatory.
You can append query options to the URI in the following format, ?option=value&option=value&...

Options

Name Default Value Description
entityType entityClassName Overrides the entityClassName from the URI.
persistenceUnit camel The JPA persistence unit used by default.
consumeDelete true JPA consumer only: If true, the entity is deleted after it is consumed; if false, the entity is not deleted.
consumeLockEntity true JPA consumer only: Specifies whether or not to set an exclusive lock on each entity bean while processing the results from polling.
flushOnSend true JPA producer only: Flushes the EntityManager after the entity bean has been persisted.
maximumResults -1 JPA consumer only: Set the maximum number of results to retrieve on the Query.
transactionManager null This option is Registry based, which requires the # notation so that the given transactionManager being specified can be looked up properly, e.g. transactionManager=#myTransactionManager. It specifies the transaction manager to use. If none provided, Apache Camel will use a JpaTransactionManager by default. Can be used to set a JTA transaction manager (for integration with an EJB container).
consumer.delay 500 JPA consumer only: Delay in milliseconds between each poll.
consumer.initialDelay 1000 JPA consumer only: Milliseconds before polling starts.
consumer.useFixedDelay false JPA consumer only: Set to true to use fixed delay between polls, otherwise fixed rate is used. See ScheduledExecutorService in JDK for details.
maxMessagesPerPoll 0 Apache Camel 2.0:JPA consumer only: An integer value to define the maximum number of messages to gather per poll. By default, no maximum is set. Can be used to avoid polling many thousands of messages when starting up the server. Set a value of 0 or negative to disable.
consumer.query JPA consumer only: To use a custom query when consuming data.
consumer.namedQuery JPA consumer only: To use a named query when consuming data.
consumer.nativeQuery JPA consumer only: To use a custom native query when consuming data.
consumer.parameters Camel 2.12: JPA consumer only: the parameters map which will be used for building the query. The parameters is an instance of Map which key is String and value is Object. It's is expected to be of the generic type java.util.Map<String, Object>, where the keys are the named parameters of a given JPA query and the values are their corresponding effective values you want to select for.
consumer.resultClass Camel 2.7: JPA consumer only: Defines the type of the returned payload (we will call entityManager.createNativeQuery(nativeQuery, resultClass) instead of entityManager.createNativeQuery(nativeQuery)). Without this option, we will return an object array. Only has an affect when using in conjunction with native query when consuming data.
consumer.transacted false *Camel 2.7.5/2.8.3/2.9: JPA consumer only:* Whether to run the consumer in transacted mode, by which all messages will either commit or rollback, when the entire batch has been processed. The default behavior (false) is to commit all the previously successfully processed messages, and only rollback the last failed message.
consumer.lockModeType WRITE Camel 2.11.2/2.12: To configure the lock mode on the consumer. The possible values is defined in the enum javax.persistence.LockModeType. The default value is changed to PESSIMISTIC_WRITE since Camel 2.13.
consumer.SkipLockedEntity
false
Camel 2.13: To configure whether to use NOWAIT on lock and silently skip the entity.
usePersist false Camel 2.5: JPA producer only: Indicates to use entityManager.persist(entity) instead of entityManager.merge(entity). Note: entityManager.persist(entity) doesn't work for detached entities (where the EntityManager has to execute an UPDATE instead of an INSERT query)!
consumer.SkipLockedEntity
false
Camel 2.13: To configure whether to use NOWAIT on lock and silently skip the entity.

Message Headers

Apache Camel adds the following message headers to the exchange:
Header Type Description
CamelEntityManager EntityManager Camel 2.12: JPA consumer / Camel 2.12.2: JPA producer: The JPA EntityManager object being used by JpaConsumer or JpaProducer.

Configuring EntityManagerFactory

You are strongly advised to configure the JPA component to use a specific EntityManagerFactory instance. If you do not do so, each JpaEndpoint will auto-create its own EntityManagerFactory instance.For example, you can instantiate a JPA component that references the myEMFactory entity manager factory, as follows:
<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
   <property name="entityManagerFactory" ref="myEMFactory"/>
</bean>
In Camel 2.3 the JpaComponent will auto lookup the EntityManagerFactory from the Registry which means you do not need to configure this on the JpaComponent as shown above. You only need to do so if there is ambiguity, in which case Camel will log a WARN.

Configuring TransactionManager

Since Camel 2.3 the JpaComponent will auto lookup the TransactionManager from the Registry. If Camel does not find any TransactionManager instance registered, it will also look up for the TransactionTemplate and try to extract TransactionManager from it. If no TransactionTemplate is available in the registry, JpaEndpoint will auto-create its own instance of TransactionManager.
If more than a single instance of the TransactionManager is found, Camel logs a WARN message. In such cases, you might want to instantiate and explicitly configure a JPA component that references the myTransactionManager transaction manager, as follows:
<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
   <property name="entityManagerFactory" ref="myEMFactory"/>
   <property name="transactionManager" ref="myTransactionManager"/>
</bean>

Using a consumer with a named query

For consuming only selected entities, you can use the consumer.namedQuery URI query option. First, you have to define the named query in the JPA Entity class:
@Entity
@NamedQuery(name = "step1", query = "select x from MultiSteps x where x.step = 1")
public class MultiSteps {
   ...
}
After that you can define a consumer uri like this one:
from("jpa://org.apache.camel.examples.MultiSteps?consumer.namedQuery=step1")
.to("bean:myBusinessLogic");

Using a consumer with a query

For consuming only selected entities, you can use the consumer.query URI query option. You only have to define the query option:
from("jpa://org.apache.camel.examples.MultiSteps?consumer.query=select o from org.apache.camel.examples.MultiSteps o where o.step = 1")
.to("bean:myBusinessLogic");

Using a consumer with a native query

For consuming only selected entities, you can use the consumer.nativeQuery URI query option. You only have to define the native query option:
from("jpa://org.apache.camel.examples.MultiSteps?consumer.nativeQuery=select * from MultiSteps where step = 1")
.to("bean:myBusinessLogic");
If you use the native query option, you will receive an object array in the message body.

Example

See the Tracer Example for an example using JPA to store traced messages into a database.

Using the JPA based idempotent repository

In this section we will use the JPA based idempotent repository.
First we need to setup a persistence-unit in the persistence.xml file:
<persistence-unit name="idempotentDb" transaction-type="RESOURCE_LOCAL">
   <class>org.apache.camel.processor.idempotent.jpa.MessageProcessed</class>
 
   <properties>
     <property name="openjpa.ConnectionURL" value="jdbc:derby:target/idempotentTest;create=true"/>
     <property name="openjpa.ConnectionDriverName" value="org.apache.derby.jdbc.EmbeddedDriver"/>
     <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
     <property name="openjpa.Log" value="DefaultLevel=WARN, Tool=INFO"/>
   </properties>
 </persistence-unit>
Second we have to setup a org.springframework.orm.jpa.JpaTemplate which is used by the org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository:
<!-- this is standard spring JPA configuration -->
 <bean id="jpaTemplate" class="org.springframework.orm.jpa.JpaTemplate">
     <property name="entityManagerFactory" ref="entityManagerFactory"/>
 </bean>
 
 <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
     <!-- we use idempotentDB as the persitence unit name defined in the persistence.xml file -->
     <property name="persistenceUnitName" value="idempotentDb"/>
 </bean>
Afterwards we can configure our org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository:
<!-- we define our jpa based idempotent repository we want to use in the file consumer -->
 <bean id="jpaStore" class="org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository">
     <!-- Here we refer to the spring jpaTemplate -->
     <constructor-arg index="0" ref="jpaTemplate"/>
     <!-- This 2nd parameter is the name  (= a cateogry name).
          You can have different repositories with different names -->
     <constructor-arg index="1" value="FileConsumer"/>
 </bean>
And finally we can create our JPA idempotent repository in the spring XML file as well:
<camelContext xmlns="http://camel.apache.org/schema/spring">	
    <route id="JpaMessageIdRepositoryTest">
        <from uri="direct:start" />
        <idempotentConsumer messageIdRepositoryRef="jpaStore">
            <header>messageId</header>
            <to uri="mock:result" />
        </idempotentConsumer>
    </route>
</camelContext>