Version 4.4.1
Copyright © 2011-2013 Red Hat, Inc. and/or its affiliates.
Updated: 06 Jun 2013
Table of Contents
List of Figures
List of Tables
List of Examples
This chapter defines some basic transaction concepts and explains how to generate and build a simple transactional JMS example in Fuse Mediation Router.
The prototype of a transaction is an operation that conceptually consists of a single step (for example, transfer money from account A to account B), but must be implemented as a series of steps. Clearly, such operations are acutely vulnerable to system failures, because a crash is likely to leave some of the steps unfinished, leaving the system in an inconsistent state. For example, if you consider the operation of transferring money from account A to account B: if the system crashes after debiting account A, but before crediting account B, the net result is that some money disappears into thin air.
In order to make such an operation reliable, it must be implemented as a transaction. On close examination, it turns out that there are four key properties a transaction must have in order to guarantee reliable execution: these are the so-called ACID properties of a transaction.
The ACID properties of a transaction are defined as follows:
Atomic—a transaction is an all or nothing procedure; individual updates are assembled and either committed or aborted (rolled back) simultaneously when the transaction completes.
Consistent—a transaction is a unit of work that takes a system from one consistent state to another.
Isolated—while a transaction is executing, its partial results are hidden from other entities accessing the transaction.
Durable—the results of a transaction are persistent.
A transaction client is an API or object that enables you
to initiate and end transactions. Typically, a transaction client exposes operations
that enable you to begin, commit, or
roll back a transaction. In the context of the Spring
framework, the PlatformTransactionManager exposes a transaction client
API.
Transaction demarcation refers to the initiating and ending of transactions (where transactions can be ended either by being committed or rolled back). Demarcation can be effected either explicitly (for example, by calling a transaction client API) or implicitly (for example, whenever a message is polled from a transactional endpoint).
A resource is any component of a computer system that can undergo a persistent or permanent change. In practice, a resource is almost always a database or a service layered over a database (for example, a message service with persistence). Other kinds of resource are conceivable, however. For example, an Automated Teller Machine (ATM) is a kind of resource: once a customer has physically accepted cash from the machine, the transaction cannot be reversed.
A transaction manager is responsible for coordinating transactions across one or more resources. In many cases, a transaction manager is built into a resource. For example, enterprise-level databases generally include a transaction manager that is capable of managing transactions involving that database. But for transactions involving more than one resource, it is normally necessary to employ an external transaction manager implementation.
For transactions involving a single resource, the transaction manager built into the resource can generally be used. For transactions involving multiple resources, however, it is necessary to use an external transaction manager or a transaction processing (TP) monitor. In this case, the resources must be integrated with the transaction manager by registering their XA switches. There is also an important difference between the types of algorithm that are used for committing single-resource systems and multiple-resource systems, as follows:
1-phase commit—suitable for single-resource systems, this protocol commits a transaction in a single step.
2-phase commit—suitable for multiple-resource systems, this protocol commits a transaction in two steps. Including multiple resources in a transaction introduces an extra element of risk: there is the danger that a system failure might occur after some, but not all, of the resources have been committed. This would leave the system in an inconsistent state. The 2-phase commit protocol is designed to eliminate this risk, ensuring that the system can always be restored to a consistent state after it is restarted.
To understand transaction processing, it is crucial to appreciate the basic relationship between transactions and threads: transactions are thread-specific. That is, when a transaction is started, it is attached to a specific thread (technically, a transaction context object is created and associated with the current thread). From this point on (until the transaction ends), all of the activity in the thread occurs within this transaction scope. Conversely, activity in any other thread does not fall within this transaction's scope (although it might fall within the scope of some other transaction).
From this, we can draw a few simple conclusions:
An application can process multiple transactions simultaneously—as long as each of the transactions are created in separate threads.
Beware of creating subthreads within a
transaction—if you are in the middle of a transaction and
you create a new pool of threads (for example, by calling the
threads() DSL command), the new threads are
not in the scope of the original
transaction.
Beware of processing steps that implicitly create new threads—for the same reason given in the preceding point.
Transaction scopes do not usually extend across route
segments—that is, if one route segment ends with
to( and another
route segment starts with
JoinEndpoint)from(, these route
segments typically do not belong to the same
transaction. There are exceptions, however (see Breaking a route into fragments).JoinEndpoint)
Some advanced transaction manager implementations give you the freedom to detach and attach transaction contexts to and from threads at will. For example, this makes it possible to move a transaction context from one thread to another thread. In some cases it is also possible to attach a single transaction context to multiple threads.
A transaction context is an object that encapsulates the information needed to keep track of a transaction. The format of a transaction context depends entirely on the relevant transaction manager implementation. At a minimum, the transaction context contains a unique transaction identifier.
A distributed transaction refers to a transaction in a distributed system, where the transaction scope spans multiple network nodes. A basic prerequisite for supporting distributed transactions is a network protocol that supports transmission of transaction contexts in a canonical format (see also, Distributed transaction managers). Distributed transaction lie outside the scope of Fuse Mediation Router transactions.
The X/Open XA standard describes a standardized interface for integrating resources with a transaction manager. If you want to manage a transaction that includes more than one resource, it is essential that the participating resources support the XA standard. Resources that support the XA standard expose a special object, the XA switch, which enables transaction managers (or TP monitors) to take control of their transactions. The XA standard supports both the 1-phase commit protocol and the 2-phase commit protocol.
When it comes to choosing the products that implement your transaction system, there is a great variety of database products and transaction managers available, some free of charge and some commercial. All of them have nominal support for transaction processing, but there are considerable variations in the qualities of service supported by these products. This section provides a brief guide to the kind of features that you need to consider when comparing the reliability and sophistication of different transaction products.
The following features determine the quality of service of a resource:
ANSI SQL defines four transaction isolation levels, as follows:
SERIALIZABLETransactions are perfectly isolated from each other. That is, nothing that one transaction does can affect any other transaction until the transaction is committed. This isolation level is described as serializable, because the effect is as if all transactions were executed one after the other (although in practice, the resource can often optimize the algorithm, so that some transactions are allowed to proceed simultaneously).
REPEATABLE_READEvery time a transaction reads or updates the database, a read or
write lock is obtained and held until the end of the transaction. This
provides almost perfect isolation. But there is one case where isolation
is not perfect. Consider a SQL SELECT statement that reads
a range of rows using a WHERE clause. If another
transaction adds a row to this range while the first transaction is
running, the first transaction can see this new row, if it repeats the
SELECT call (a phantom
read).
READ_COMMITTEDRead locks are not held until the end of a transaction. So, repeated reads can give different answers (updates committed by other transactions are visible to an ongoing transaction).
READ_UNCOMMITTEDNeither read locks nor write locks are held until the end of a transaction. Hence, dirty reads are possible (that is, a transaction can see uncommitted updates made by other transactions).
Databases generally do not support all of the different transaction isolation
levels. For example, some free databases support only READ_UNCOMMITTED.
Also, some databases implement transaction isolation levels in ways that are subtly
different from the ANSI standard. Isolation is a complicated issue, which involves
trade offs with database performance (for example, see Isolation in
Wikipedia).
In order for a resource to participate in a transaction involving multiple resources, it needs to support the X/Open XA standard. You also need to check whether the resource's implementation of the XA standard is subject to any special restrictions. For example, some implementations of the XA standard are restricted to a single database connection (which implies that only one thread at a time can process a transaction involving that resource).
The following features determine the quality of service of a transaction manager:
A key differentiator for transaction managers is the ability to support multiple resources. This normally entails support for the XA standard, where the transaction manager provides a way for resources to register their XA switches.
Strictly speaking, the XA standard is not the only approach you can use to support multiple resources, but it is the most practical one. The alternative typically involves writing tedious (and critical) custom code to implement the algorithms normally provided by an XA switch.
Some transaction managers support advanced capabilities for manipulating the associations between a transaction context and application threads, as follows:
Suspend/resume current transaction—enables you to suspend temporarily the current transaction context, while the application does some non-transactional work in the current thread.
Attach/detach transaction context—enables you to move a transaction context from one thread to another or to extend a transaction scope to include multiple threads.
Some transaction managers have the capability to manage transactions whose scope includes multiple nodes in a distributed system (where the transaction context is propagated from node to node using special protocols such as WS-AtomicTransactions or CORBA OTS).
Advanced transaction managers typically provide visual tools to monitor the status of pending transactions. This kind of tool is particularly useful after a system failure, where it can help to identify and resolve transactions that were left in an uncertain state (heuristic exceptions).
There are significant variations amongst transaction managers with respect to their robustness in the event of a system failure (crash). The key strategy that transaction managers use is to write data to a persistent log before performing each step of a transaction. In the event of a failure, the data in the log can be used to recover the transaction. Some transaction managers implement this strategy more carefully than others. For example, a high-end transaction manager would typically duplicate the persistent transaction log and allow each of the logs to be stored on separate host machines.
The following are required to complete this example:
Internet connection (required by Maven)
Fuse Mediation Router requires a Java 6 development kit (JDK 1.6.0). After installing the
JDK, set your JAVA_HOME environment variable to point to the root
directory of your JDK, and set your PATH environment variable to
include the Java bin directory.
The Fuse Mediation Router Maven tooling requires Apache Maven version 2.2.1 or later. To download Apache Maven, go to http://maven.apache.org/download.html.
After installing Apache Maven do the following:
Set your M2_HOME environment variable to point to the
Maven root directory.
Set your MAVEN_OPTS environment variable to
-Xmx512M to increase the memory available for Maven
builds.
Set your PATH environment variable to include the Maven
bin directory:
| Platform | Path |
|---|---|
| Windows | %M2_HOME%\bin |
| UNIX | $M2_HOME/bin |
Use the Maven archetype, camel-archetype-java, to generate a
sample Java application which you can then use as a starting point for your
application.
To generate the new project, perform the following steps:
Open a new command window and change to the directory where you want to store the new Maven project.
Enter the following command to generate the new Maven project:
mvn archetype:create -DremoteRepositories=https://repository.apache.org/content/groups/public -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-java -DarchetypeVersion=2.8.0-fuse-00-08 -DgroupId=tutorial -DartifactId=tx-jms-router
This command generates a basic router application under the
tx-jms-router directory. You will customize this basic
application to demonstrate transactions in .Fuse Mediation Router
Maven accesses the Internet to download JARs and stores them in its local repository.
Customize the project POM file, tx-jms-router/pom.xml, by
adding some new project dependencies. First of all, define some
properties for the dependency versions. Using your favorite text editor,
open the POM file and add a spring-version property and an
activemq-version property as follows:
<project ...>
...
<properties>
...
<spring-version>3.0.5.RELEASE</spring-version>
<activemq-version>5.5.1-fuse-00-08</activemq-version>
<xbean-version>3.7</xbean-version>
</properties>
...
</project>Add dependencies on the artifacts that implement Spring transactions.
Look for the dependencies element in the POM file and add
the following dependency elements:
<project ...>
...
<dependencies>
...
<!-- Spring transaction dependencies -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring-version}</version>
</dependency>
</dependencies>
...
</project>Add the JMS and ActiveMQ dependencies. Look for the
dependencies element in the POM file and add the
following dependency elements:
<project ...>
...
<dependencies>
...
<!-- Persistence artifacts -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>${camel-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>${activemq-version}</version>
</dependency>
<!-- This is needed by the camel-jms component -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>${xbean-version}</version>
</dependency>
</dependencies>
...
</project>The basic requirements for writing a transactional application in Spring are a
transaction manager bean and a resource
bean (or, in some cases, multiple resource beans). You can then
use the transaction manager bean either to create a transactional Fuse Mediation Router
component (see Demarcation by Transactional Endpoints) or to mark a
route as transactional, using the transacted() Java DSL command
(see Demarcation by Marking the Route).
To configure the JMS transaction manager and the JMS resource, perform the following steps:
Customize the Spring XML configuration. Using your favorite text
editor, open the
tx-jms-router/src/main/resources/META-INF/spring/camel-context.xml
file and add the following content to the beans
element:
<beans ... >
...
<bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://broker1?brokerConfig=xbean:tutorial/activemq.xml"/>
</bean>
</beans>This configuration creates a custom JMS component, with bean ID equal
to jmstx, that you can use to define transactional JMS
endpoints in your routes. The underlying JMS system is an embedded
Fuse Message Broker (Apache ActiveMQ) broker, which takes its configuration from
the file, tutorial/activemq.xml.
Create the ActiveMQ configuration file, activemq.xml.
First create the new directory,
tx-jms-router/src/main/resources/tutorial. Next, using
your favorite text editor, create the file, activemq.xml,
under the tutorial directory, and add the following
text:
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="true" xmlns="http://activemq.org/config/1.0" persistent="false" brokerName="broker1">
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>With the transactional JMS component, jmstx, you can define a
transactional route written in Java DSL.
It is also possible to define the route using Spring XML syntax.
To define a route using the transactional JMS component, jmstx,
perform the following steps:
Define a route in the Java domain specific language (DSL). Using your
favorite text editor, open the file,
tx-jms-router/src/main/java/tutorial/MyRouteBuilder.java.
Edit the body of the MyRouteBuilder.configure() method,
discarding the existing route and replacing it with the following
code:
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.convertBodyTo(String.class)
.to("jmstx:queue:giro");
from("jmstx:queue:giro")
.to("jmstx:queue:credits")
.to("jmstx:queue:debits")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
// should be printed n times due to redeliveries
System.out.println("exchange = " + exchange);
// force rollback
throw new Exception("test");
}
});
}
...
}Near the top of the MyRouteBuilder.java file, add the
requisite import statements, as follows:
// Java ... import org.apache.camel.Processor; import org.apache.camel.Exchange; ...
After building and running the example using Maven, you can use JMX to examine what has happened to the JMS queues involved in the application.
To build and run the transactional JMS example, perform the following steps:
To build the example, open a command prompt, change directory to
tx-jms-router, and enter the following Maven
command:
mvn install
If the build is successful, you should see the file,
tx-jms-router-1.0-SNAPSHOT.jar, appear under the
tx-jms-router/target directory.
To run the example using the camel-maven-plugin, enter
the following Maven command:
mvn camel:run
If all goes well, you should see about a dozen occurrences of
java.lang.Exception: test scrolling past, before
activity in the console window comes to a halt. Do not kill
the running application at this point!
But make sure that the exceptions you are seeing in the console do
not indicate a failure to download and install the
camel-maven-plugin. Normally, the plug-in should
download and install without any problems, because the generated POM
file, tx-jms-router/pom.xml, contains all of the
requisite settings.
What happened? The series of runtime exceptions thrown by the
application is exactly what we expect to happen,
because the route is programmed to throw an exception every time an
exchange is processed by the route. The purpose of throwing the
exception is to trigger a transaction rollback, causing the current
exchange to be un-enqueued from the queue:credit and
queue:debit queues.
To gain a better insight into what occurred, open a JMX console and point it at the ActiveMQ broker. Open a new command prompt and enter the following command:
jconsole
The jconsole utility is a standard tool provided with
Sun's J2SE distribution (JDK).
To open a JMX connection to the ActiveMQ broker (which is embedded in the running example application), click on the Remote tab of the JConsole: Connect to Agent dialog and enter the Port number, 1099 (the default JMX port for ActiveMQ). Click Connect.

It is possible to customize the JMX port used by ActiveMQ. See http://activemq.apache.org/jmx.html for details.
If the connection succeeds, the JConsole window shows you a summary of
the Virtual Machine (VM) instance that you are connected to. Click the
MBeans tab and drill down to the
giro queue, in
Tree/org.apache.activemq/broker1/Queue.
![]() |
Notice that the EnqueueCount, DispatchCount,
and DequeueCount for giro are all equal to 2,
which indicates that two messages entered the queue and two messages
were pulled off the queue.
Click on the debits queue. Notice that the
EnqueueCount, DispatchCount, and
DequeueCount for debits are all equal to
0. This is because the test exception caused the enqueued
message to be rolled back each time an exchange passed through the
route. The same thing happened to the credits queue.
Click on the ActiveMQ.DLQ queue. The DLQ
part of this name stands for Dead Letter Queue
and it is an integral part of the way ActiveMQ deals with failed message
dispatches. In summary, the default behavior of ActiveMQ when it fails
to dispatch a message (that is, when an exception reaches the JMS
consumer endpoint, jmstx:queue:giro), is as follows:
The consumer endpoint attempts to redeliver the message. Redelivery attempts can be repeated up to a configurable maximum number of times.
If the redeliveries limit is exceeded, the consumer endpoint
gives up trying to deliver the message and enqueues it on the
dead letter queue instead (by default,
ActiveMQ.DLQ).
You can see from the status of the ActiveMQ.DLQ queue
that the number of enqueued messages, EnqueueCount, is
equal to 2. This is where the failed messages have ended up.
You can now kill the example application by typing Ctrl-C in its command window.
This chapter describes how to select and configure a transaction manager instance in Spring. Most of the difficult work of configuring transactions consists of setting up the transaction manager correctly. Once you have completed this step, it is relatively easy to use transactions in your Fuse Mediation Router routes.
A transaction manager is the part of an application that is responsible for coordinating transactions across one or more resources. In the Spring framework, the transaction manager is effectively the root of the transaction system. Hence, if you want to enable transactions on a component in Spring, you typically create a transaction manager bean and pass it to the component.
The responsibilities of the transaction manager are as follows:
Demarcation—starting and ending transactions using begin, commit, and rollback methods.
Managing the transaction context—a transaction context contains the information that a transaction manager needs to keep track of a transaction. The transaction manager is responsible for creating transaction contexts and attaching them to the current thread.
Coordinating the transaction across multiple resources—enterprise-level transaction managers typically have the capability to coordinate a transaction across multiple resources. This feature requires the 2-phase commit protocol and resources must be registered and managed using the XA protocol (see X/Open XA standard).
This is an advanced feature, not supported by all transaction managers.
Recovery from failure—transaction managers are responsible for ensuring that resources are not left in an inconsistent state, if there is a system failure and the application crashes. In some cases, manual intervention might be required to restore the system to a consistent state.
A local transaction manager is a transaction manager that can coordinate transactions over a single resource only. In this case, the implementation of the transaction manager is typically embedded in the resource itself and the Spring transaction manager is just a thin wrapper around this built-in transaction manager.
For example, the Oracle database has a built-in transaction manager that supports
demarcation operations (using SQL operations, BEGIN,
COMMIT, ROLLBACK, or using a native Oracle API) and
various levels of transaction isolation. Control over the Oracle transaction manager
can be exported through JDBC, which is how Spring is able to wrap this transaction
manager.
It is important to understand what constitutes a resource, in this context. For example, if you are using a JMS product, the JMS resource is the single running instance of the JMS product, not the individual queues and topics. Moreover, sometimes, what appears to be multiple resources might actually be a single resource, if the same underlying resource is accessed in different ways. For example, your application might access a relational database both directly (through JDBC) and indirectly (through an object-relational mapping tool like Hibernate). In this case, the same underlying transaction manager is involved, so it should be possible to enrol both of these code fragments in the same transaction.
It cannot be guaranteed that this will work in every case. Although it is possible in principle, some detail in design of the Spring framework or other wrapper layers might prevent it from working in practice.
Of course, it is possible for an application to have many different local transaction managers working independently of each other. For example, you could have one route that manipulates JMS queues and topics, where the JMS endpoints reference a JMS transaction manager. Another route could access a relational database through JDBC. But you could not combine JDBC and JMS access in the same route and have them both participate in the same transaction.
A global transaction manager is a transaction manager that can coordinate transactions over multiple resources. In this case, you cannot rely on the transaction manager built into the resource itself. Instead, you require an external system, sometimes called a transaction processing monitor (TP monitor), that is capable of coordinating transactions across different resources.
The following are the prerequisites for global transactions:
Global transaction manager or TP monitor—an external transaction system that implements the 2-phase commit protocol for coordinating multiple XA resources.
Resources that support the XA standard—in order to participate in a 2-phase commit, resources must support the X/Open XA standard. In practice, this means that the resource is capable of exporting an XA switch object, which gives complete control of transactions to the external TP monitor.
The Spring framework does not by itself provide a TP
monitor to manage global transactions. It does, however, provide support for
integrating with an OSGi-provided TP monitor or with a J2EE-provided TP monitor
(where the integration is implemented by the JtaTransactionManager class). Hence, if you deploy
your application into an OSGi container with full transaction support, you can
use multiple transactional resources in Spring.
Usually, a server connects directly to the resources involved in a transaction. In a distributed system, however, it is occasionally necessary to connect to resources that are exposed only indirectly, through a Web service or through a CORBA IDL interface. In this case, you require a TP monitor that is capable of supporting distributed transactions. Several standards are available that describe how to support transactions for various distributed protocols—for example, the WS-AtomicTransactions specification for Web services and the CORBA Object Transaction Service (OTS) specification for CORBA applications.
Figure 2.1 shows an overview of the Spring transaction architecture.
In the standalone deployment model, the Spring container provides access to persistent data sources and is responsible for managing the transactions associated with those data sources. A notable limitation of the standalone model is that the Spring container can support only local transaction managers, which means that only one data source (resource) at a time can participate in a transaction.
Spring supports a variety of different wrapper APIs for accessing persistent
storage. For example, to access a database through JDBC, Spring provides the
SimpleDriverDataSource class to represent the database instance and
the JdbcTemplate class to provide access to the database using SQL.
Wrappers are also provided for other kinds of persistent resource, such as JMS,
Hibernate, and so on. The Spring data sources are designed to be compatible with the
local transaction manager classes.
In Spring, a local transaction manager is a wrapper class that is responsible for managing the transactions of a single resource. The local transaction manager is responsible for starting, committing, and rolling back transactions. Typically, the way that you use a transaction manager in Fuse Mediation Router is that you pass the transaction manager reference to a transactional Fuse Mediation Router component bean.
Spring provides different types of local transaction manager for different types
of data source. For example, Spring provides a
DataSourceTransactionManager for JDBC, a
JmsTransactionManager for JMS, a
HibernateTransactionManager for Hibernate, and so on.
Figure 2.2 shows an overview of the OSGi transaction architecture in Fuse ESB. The core of the architecture is a JTA transaction manager based on Apache Geronimo, which exposes various transaction interfaces as OSGi services.
The JTA Transaction Services Specification section of the OSGi Service Platform Enterprise Specification describes the kind of transaction support that can (optionally) be provided by an OSGi container. Essentially, OSGi mandates that the transaction service is accessed through the Java Transaction API (JTA).
The transaction service exports the following JTA interfaces as OSGi services (the JTA services):
javax.transaction.UserTransaction
javax.transaction.TransactionManager
javax.transaction.TransactionSynchronizationRegistry
Only one JTA provider should be made available in an OSGi container. In other words, the JTA services are registered only once and the objects obtained by importing references to the JTA services must be unique.
The Fuse ESB transaction service exports the following additional interfaces as OSGi services:
org.springframework.transaction.PlatformTransactionManager
org.apache.geronimo.transaction.manager.RecoverableTransactionManager
By obtaining a reference to the PlatformTransactionManager OSGi
service, it is possible to integrate application bundles written using the Spring
transaction API into the Fuse ESB transaction architecture.
The PlatformTransactionManager interface is the key
abstraction in the Spring transaction API, providing the classic transaction client
operations: begin, commit and
rollback. This interface thus provides the essential
methods for controlling transactions at run time.
The other key aspect of any transaction system is the API for implementing transactional resources. But transactional resources are generally implemented by the underlying database, so this aspect of transactional programming is rarely a concern for the application programmer.
Example 2.1 shows the definition of the
org.springframework.transaction.PlatformTransactionManager
interface.
Example 2.1. The PlatformTransactionManager Interface
package org.springframework.transaction;
public interface PlatformTransactionManager {
TransactionStatus getTransaction(TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}The TransactionDefinition interface is used to specify the
characteristics of a newly created transaction. It is used to specify the
isolation level and the propagation
policy of the new transaction. For more details, see Propagation Policies.
The TransactionStatus interface can be used to check the status of
the current transaction (that is, the transaction associated with the current
thread) and to mark the current transaction for rollback. It is defined as
follows:
public interface TransactionStatus extends SavepointManager {
boolean isNewTransaction();
boolean hasSavepoint();
void setRollbackOnly();
boolean isRollbackOnly();
void flush();
boolean isCompleted();
}The PlatformTransactionManager interface defines the following
methods:
getTransaction()Create a new transaction and associate it with the current
thread, passing in a TransactionDefinition
object to define the characteristics of the new transaction. This is
analogous to the begin() method of many other
transaction client APIs.
commit()Commit the current transaction, making permanent all of the pending changes to the registered resources.
rollback()Roll back the current transaction, undoing all of the pending changes to the registered resources.
Generally, you do not use the
PlatformTransactionManager interface directly. In Fuse Mediation Router, you
typically use a transaction manager as follows:
Create an instance of a transaction manager (there are several different implementations available in Spring—see Transaction Manager Implementations).
Pass the transaction manager instance either to a Fuse Mediation Router component or to
the transacted() DSL command in a route. The transactional
component or the transacted() command is then responsible for
demarcating transactions (see Transaction Demarcation).
This section provides a brief overview of all the transaction manager implementations provided by the Spring framework. In general, the implementations fall into two different categories: local transaction managers and global transaction managers.
Table 2.1 summarizes the local transaction manager implementations provided by the Spring framework. These transaction managers are distinguished by the fact that they support a single resource only.
Table 2.1. Local Transaction Managers
| Transaction Manager | Description |
|---|---|
JmsTransactionManager |
A transaction manager implementation that is capable of managing a single JMS resource. That is, you can connect to any number of queues or topics, but only if they belong to the same underlying JMS messaging product instance. Moreover, you cannot enlist any other types of resource in a transaction. For example, using this transaction manager, it would not be possible to enlist both a SonicMQ resource and an Apache ActiveMQ resource in the same transaction. But see Table 2.2. |
DataSourceTransactionManager | A transaction manager implementation that is capable of managing a single JDBC database resource. That is, you can update any number of different database tables, but only if they belong to the same underlying database instance. |
HibernateTransactionManager |
A transaction manager implementation that is capable of managing a Hibernate resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction. |
JdoTransactionManager | A transaction manager implementation that is capable of managing a Java Data Objects (JDO) resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction. |
JpaTransactionManager | A transaction manager implementation that is capable of managing a Java Persistence API (JPA) resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction. |
CciLocalTransactionManager |
A transaction manager implementation that is capable of managing a Java Connection Architecture (JCA) resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction. |
Table 2.2 summarizes the global transaction manager implementations provided by the Spring framework. These transaction managers are distinguished by the fact that they can support multiple resources.
Table 2.2. Global Transaction Managers
| Transaction Manager | Description |
|---|---|
JtaTransactionManager | If you require a transaction manager that is capable of enlisting more than one resource in a transaction, use the JTA transaction manager, which is capable of supporting the XA transaction API. You must deploy your application inside either an OSGi container or a J2EE server to use this transaction manager. |
OC4JJtaTransactionManagner | A specialization of the JtaTransactionManager to
work with Oracle's OC4J. The advantage of this implementation is
that it makes Spring-driven transactions visible in OC4J's
transaction monitor |
WebLogicJtaTransactionManager | A specialization of the JtaTransactionManager to
work with the BEA WebLogic container. Makes certain advanced
transaction features available: transaction names, per-transaction
isolation levels, and proper suspension/resumption of
transactions. |
WebSphereUowTransactionManager | A specialization of the JtaTransactionManager to
work with the IBM WebSphere container. Enables proper
suspension/resumption of transactions. |
If you need to access a database, the JDBC data source provides a convenient,
general-purpose mechanism for connecting to a database and making SQL based
queries and updates. To group multiple updates into a single transaction, you
can instantiate a Spring DataSourceTransactionManager and create a
transaction scope using the transacted() DSL command.
Example 2.2 shows how to instantiate a
JDBC transaction manager, of DataSourceTransactionManager type,
which is required if you want to integrate a JDBC connection with Spring
transactions. The JDBC transaction manager requires a reference to data source
bean (created here with the ID, dataSource).
Example 2.2. Data Source Transaction Manager Configuration
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
...
<!-- spring transaction manager -->
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- datasource to the database -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<property name="driverClass" value="org.hsqldb.jdbcDriver"/>
<property name="url" value="jdbc:hsqldb:mem:camel"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
</bean>
</beans>In Example 2.2, the
txManager bean is a local JDBC transaction manager instance, of
DataSourceTransactionManager type. There is just one property
you need to provide to the JDBC transaction manager: a reference to a
JDBC data source.
In Example 2.2, the
dataSource bean is an instance of a JDBC data source, of
javax.sql.DataSource type. The JDBC data source is a standard
feature of the Java DataBase Connectivity (JDBC) specification and it represents
a single JDBC connection, which encapsulating the information required to
connect to a specific database.
In Spring, the recommended way to create a data source is to instantiate a
SimpleDriverDataSource bean (which implements the
javax.sql.DataSource interface). The simple driver data source
bean creates a new data source using a JDBC driver class (which is effectively a
data source factory). The properties that you supply to the driver manager data
source bean are specific to the database you want to connect to. In general, you
need to supply the following properties:
An instance of java.sql.Driver, which is the JDBC
driver implemented by the database you want to connect to. Consult
the third-party database documentation for the name of this driver
class (some examples are given in Table 2.6).
The JDBC URL that is used to open a connection to the database. Consult the third-party database documentation for details of the URL format (some examples are given in Table 2.6).
For example, the URL provided to the dataSource bean
in Example 2.2 is in a format
prescribed by the HSQLDB database. The URL,
jdbc:hsqldb:mem:camel, can be parsed as
follows:
The prefix, jdbc:hsqldb:, is common to all
HSQLDB JDBC connection URLs;
The prefix, mem:, signifies an in-memory
(non-persistent) database;
The final identifier, camel, is an arbitrary
name that identifies the in-memory database instance.
The username that is used to log on to the database.
For example, when a new HSQLDB database instance is created, the
sa user is created by default (with administrator
privileges).
The password that matches the specified username.
Spring provides a variety of data source implementations, which are suitable for standalone mode (that is, the application is not deployed inside an OSGi container). These data sources are described in Table 2.3.
Table 2.3. Standalone Data Source Classes
| Data Source Class | Description |
|---|---|
SimpleDriverDataSource |
This data source should always be used in standalone mode. You configure this data source by providing it with details of a third-party JDBC driver class. This implementation has the following features:
|
DriverManagerDataSource | (Deprecated) Incompatible with OSGi
containers. This class is superseded by the
SimpleDriverDataSource. |
SingleConnectionDataSource | A data source that opens only one database connection (that
is, every call to getConnection() returns a
reference to the same connection instance). It follows that this
data source is incompatible with
multi-threading and is therefore not recommended
for general use. |
If your application is deployed into a J2EE container, it does not make sense
to create a data source directly. Instead, you should let the J2EE container
take care of creating data sources and you can then access those data sources by
doing a JNDI lookup. For example, the following code fragment shows how you can
obtain a data source from the JNDI reference,
java:comp/env/jdbc/myds, and then wrap the data source with a
UserCredentialsDataSourceAdapter.
<bean id="myTargetDataSource" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/jdbc/myds"/> </bean> <bean id="myDataSource" class="org.springframework.jdbc.datasource.UserCredentialsDataSourceAdapter"> <property name="targetDataSource" ref="myTargetDataSource"/> <property name="username" value="myusername"/> <property name="password" value="mypassword"/> </bean>
The JndiObjectFactoryBean exploits the Spring bean factory
pattern to look up an object in JNDI. When this bean's ID,
myTargetDataSource, is referenced elsewhere in Spring using the
ref attribute, instead of getting a reference to the
JndiObjectFactoryBean bean, you actually get a reference to the
bean that was looked up in JNDI (a javax.sql.DataSource
instance).
The standard javax.sql.DataSource interface exposes two methods
for creating connections: getConnection() and
getConnection(String username, String password). If (as is
normally the case) the referenced database requires credentials in order to open
a connection, the UserCredentialsDataSourceAdapter class provides a
convenient way of ensuring that these user credentials are available. You can
use this adapter class for wrapping JNDI-provided data sources that do not have
their own credentials cache.
In addition to UserCredentialsDataSourceAdapter, there are a
number of other adapter classes that you can use to wrap data sources obtained
from JNDI lookups. These J2EE data source adapters are summarized in Table 2.4.
Table 2.4. J2EE Data Source Adapters
| Data Source Adapter | Description |
|---|---|
UserCredentialsDataSourceAdapter |
Data source wrapper class that caches username/password credentials, for cases where the wrapped data source does not have its own credentials cache. This class can be used to wrap a data source obtained by JNDI lookup (typically, in a J2EE container). The username/password credentials are bound to a specific thread. Hence, you can store different connection credentials for different threads. |
IsolationLevelDataSourceAdapter | Subclass of UserCredentialsDataSourceAdapter
which, in addition to caching user credentials, also applies the
current Spring transaction's level of isolation to all of the
connections it creates. |
WebSphereDataSourceAdapter | Same functionality as
IsolationLevelDataSourceAdapter, except that
the implementation is customized to work with IBM-specific
APIs. |
You can wrap a data source with a data source proxy in order to add special functionality to a data source. The data source proxies can be applied either to a standalone data source or a data source provided by the container. They are summarized in Table 2.5.
Table 2.5. Data Source Proxies
| Data Source Proxy | Description |
|---|---|
LazyConnectionDataSourceProxy |
This proxy uses lazy semantics to avoid unnecessary database operations. That is, a connection will not actually be opened until the application code attempts to write (or read) to the database. For example, if some application code opens a connection, begins a transaction, and then commits a transaction, but never actually accesses the database, the lazy connection proxy would optimize these database operations away. |
TransactionAwareDataSourceProxy |
Provides support for legacy database code that is not implemented using the Spring persistence API. Do not use this proxy for normal transaction
support. The other Spring data sources are
already compatible with the Spring persistence and
transaction APIs. For example, if your application code uses
Spring's |
Table 2.6 shows the JDBC connection details for a variety of different database products.
Table 2.6. Connection Details for Various Databases
| Database | JDBC Driver Manager Properties |
|---|---|
| HSQLDB |
The JDBC driver class for HSQLDB is as follows: org.hsqldb.jdbcDriver To connect to a HSQLDB database, you can use one of the following JDBC URL formats: jdbc:hsqldb:hsql[s]://host[:port][/ Where the |
| MySQL |
The JDBC driver class for MySQL is as follows: com.mysql.jdbc.Driver To connect to a MySQL database, use the following JDBC URL format: jdbc:mysql://[host][,failoverhost...][:port]/[ Where the |
| Oracle |
Depending on which version of Oracle you are using choose one of the following JDBC driver classes: oracle.jdbc.OracleDriver (Oracle 9i, 10) oracle.jdbc.driver.OracleDriver (Oracle 8i) To connect to an Oracle database, use the following JDBC URL format: jdbc:oracle:thin:[user/password]@[host][:port]:
Where the Oracle System ID
( |
| DB2 |
The JDBC driver class for DB2 is as follows: com.ibm.db2.jcc.DB2Driver To connect to a DB2 database, use the following JDBC URL format: jdbc:db2://host[:port]/
|
| SQL Server |
The JDBC driver class for SQL Server is as follows: com.microsoft.jdbc.sqlserver.SQLServerDriver To connect to a SQL Server database, use the following JDBC URL format: jdbc:microsoft:sqlserver://host[:port];DatabaseName=
|
| Sybase |
The JDBC driver class for Sybase is as follows: com.sybase.jdbc3.jdbc.SybDriver To connect to a Sybase database, use the following JDBC URL format: jdbc:sybase:Tds:host:port/
|
| Informix |
The JDBC driver class for Informix is as follows: com.informix.jdbc.IfxDriver To connect to an Informix database, use the following JDBC URL format: jdbc:informix-sqli://host:port/ |
| PostgreSQL |
The JDBC driver class for PostgreSQL is as follows: org.postgresql.Driver To connect to a PostgreSQL database, use the following JDBC URL format: jdbc:postgresql://host[:port]/
|
| MaxDB |
The JDBC driver class for the SAP database is as follows: com.sap.dbtech.jdbc.DriverSapDB To connect to a MaxDB database, use the following JDBC URL format: jdbc:sapdb://host[:port]/
|
| FrontBase |
The JDBC driver class for FrontBase is as follows: com.frontbase.jdbc.FBJDriver To connect to a FrontBase database, use the following JDBC URL format: jdbc:FrontBase://host[:port]/
|
To enable transactions while accessing Hibernate objects, you need to provide
an instance of the Hibernate transaction manager, of
HibernateTransactionManager type, as described here. You can
then use the transacted() DSL command to create a transaction scope
in a route.
Example 2.3 shows how to
instantiate a Hibernate transaction manager, of
HibernateTransactionManager type, which is required if you want
to integrate Hibernate object-oriented persistence with Spring transactions. The
Hibernate transaction manager requires a reference to a Hibernate session
factory, and the Hibernate session factory takes a reference to a JDBC data
source.
Example 2.3. Hibernate Transaction Manager Configuration
<beans ... >
...
<bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
<property name="url" value="jdbc:hsqldb:hsql://localhost:9001"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
</bean>
<bean id="mySessionFactory" class="org.springframework.orm.hibernate3.LocalSessionFactoryBean">
<property name="dataSource" ref="myDataSource"/>
<property name="mappingResources">
<list>
<value>product.hbm.xml</value>
</list>
</property>
<property name="hibernateProperties">
<value>
hibernate.dialect=org.hibernate.dialect.HSQLDialect
</value>
</property>
</bean>
<bean id="hibernateTxManager" class="org.springframework.orm.hibernate3.HibernateTransactionManager">
<property name="sessionFactory" ref="mySessionFactory"/>
</bean>
</beans>In Example 2.3, the
hibernateTxManager bean is a local Hibernate transaction
manager instance, of HibernateTransactionManager type. There is
just one property you need to provide to the Hibernate transaction manager: a
reference to a Hibernate session factory.
In Example 2.3, the
mySessionFactory bean is a Hibernate session factory of
org.springframework.orm.hibernate3.LocalSessionFactory type.
This session factory bean is needed by the Hibernate transaction manager.
In general, you need to supply the following properties to a Hibernate
LocalSessionFactory bean instance:
An instance of javax.sql.DataSource, which is the
JDBC data source of the database that Hibernate is layered over. For
details of how to configure a JDBC data source, see JDBC Data Source.
Specifies a list of one or more mapping association files on the class path. A Hibernate mapping association defines how Java objects map to database tables.
Allows you to set any Hibernate property, by supplying a list of
property settings. The most commonly needed property is
hibernate.dialect, which indicates to Hibernate
what sort of database it is layered over, enabling Hibernate to
optimize its interaction with the underlying database. The dialect
is specified as a class name, which can have one of the following
values:
org.hibernate.dialect.Cache71Dialectorg.hibernate.dialect.DataDirectOracle9Dialectorg.hibernate.dialect.DB2390Dialectorg.hibernate.dialect.DB2400Dialectorg.hibernate.dialect.DB2Dialectorg.hibernate.dialect.DerbyDialectorg.hibernate.dialect.FirebirdDialectorg.hibernate.dialect.FrontBaseDialectorg.hibernate.dialect.H2Dialectorg.hibernate.dialect.HSQLDialectorg.hibernate.dialect.IngresDialectorg.hibernate.dialect.InterbaseDialectorg.hibernate.dialect.JDataStoreDialectorg.hibernate.dialect.MckoiDialectorg.hibernate.dialect.MimerSQLDialectorg.hibernate.dialect.MySQL5Dialectorg.hibernate.dialect.MySQL5InnoDBDialectorg.hibernate.dialect.MySQLDialectorg.hibernate.dialect.MySQLInnoDBDialectorg.hibernate.dialect.MySQLMyISAMDialectorg.hibernate.dialect.Oracle9Dialectorg.hibernate.dialect.OracleDialectorg.hibernate.dialect.PointbaseDialectorg.hibernate.dialect.PostgreSQLDialectorg.hibernate.dialect.ProgressDialectorg.hibernate.dialect.RDMSOS2200Dialectorg.hibernate.dialect.SAPDBDialectorg.hibernate.dialect.SQLServerDialectorg.hibernate.dialect.Sybase11Dialectorg.hibernate.dialect.SybaseAnywhereDialectorg.hibernate.dialect.SybaseDialectorg.hibernate.dialect.TimesTenDialect
To enable transactions in a JPA component, you need to provide the JPA
component with a reference to a transaction manager, of
JpaTransactionManager type. The Java Persistence API is a
generic wrapper API for object-relational persistence and it can be layered over
a variety of different object-relational mapping technologies.
Example 2.4 shows how to customize the configuration of a JPA component (creating a component with the bean ID, jpa), so that the JPA component supports Spring transactions. When used with transactions, the JPA component requires a reference to an entity manager factory and a reference to a transaction manager.
Example 2.4. JPA Transaction Manager Configuration
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="transactionManager" ref="jpaTxManager"/>
</bean>
<bean id="jpaTxManager" class="org.springframework.orm.jpa.JpaTransactionManager">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
</bean>
<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
<property name="persistenceUnitName" value="camel"/>
</bean>
</beans>In Example 2.4, the
jpaTxManager bean is a local JPA transaction manager instance,
of JpaTransactionManager type. The JPA transaction manager requires
a reference to an entity manager factory bean (in this example, the
entityManagerFactory bean).
If you deploy your application into an OSGi container, however, you might want
to consider using a JtaTransactionManager instead. See Table 2.2.
The entity manager factory bean encapsulates the JPA runtime functionality.
For example, the Spring LocalEntityManagerFactoryBean
class is just a wrapper around the standard
javax.persistence.EntityManagerFactory class. The entity
manager factory is used to create a javax.persistence.EntityManager
instance, where the entity manager is associated with a unique
persistence context. A persistence context represents
a consistent set of entity objects that are instantiated from the underlying
database (analogous to a Hibernate session).
The LocalEntityManagerFactoryBean class is a relatively simple
JPA wrapper class that is suitable for simple demonstrations and testing
purposes. This class reads its required configuration information from the
persistence.xml file, which is found at the standard location,
META-INF/persistence.xml, on the class path (see Sample persistence.xml file). The
persistenceUnitName property references a section of the
persistence.xml file.
As well as instantiating a LocalEntityManagerFactoryBean bean,
there are other ways of obtaining a JPA entity manager factory, as summarized in
Table 2.7.
Table 2.7. Obtaining JPA Entity Manager Factory
| Entity Manager Factory | Description |
|---|---|
| Obtain from JNDI | If your application is deployed in a J2EE container, the recommended approach is to let the container take care of instantiating the entity manager factory. You can then obtain a reference to the entity manager factory using JNDI. See Obtaining an EntityManagerFactory from JNDI in the Spring documentation. |
LocalEntityManagerFactoryBean | For simple standalone applications and for testing, the
simplest option is to create a bean of this type. The JPA
runtime is configured using the standard
META-INF/persistence.xml file. |
LocalContainerEntityManagerFactoryBean | Use this class, if you need to configure special bootstrap options for the JPA runtime. In spite of the name, this class is not restricted to containers; you can also use it in standalone mode. See LocalContainerEntityManagerFactoryBean in the Spring documentation. |
The JPA is a thin abstraction layer that allows you to write code that is independent of a particular object-relational mapping product—for example, it enables you to layer your application over products such as OpenJPA, Hibernate, or TopLink. To match the application code to a specific JPA implementation, JPA defines a bootstrap contract, which is a procedure to locate and configure JPA implementations, as follows:
To make a JPA implementation available to your application, put the
JAR file containing the relevant JPA provider class (of javax.persistence.spi.PersistenceProvider
type) on your class path. In fact, it is possible to add multiple JPA
providers to your class path: you can optionally specify which JPA
provider to use in the persistence.xml file.
The JPA persistence layer is configured by the standard
persistence.xml file, which is normally located in
META-INF/persistence.xml on the class path.
Example 2.5 shows a sample
persistence.xml file for configuring an OpenJPA JPA
provider layered over a Derby database.
Example 2.5. Sample persistence.xml File
<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
version="1.0"
xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">
<persistence-unit name="camel" transaction-type="RESOURCE_LOCAL">
<!--
The default provider can be OpenJPA, or some other product.
This element is optional if OpenJPA is the only JPA provider
in the current classloading environment, but can be specified
in cases where there are multiple JPA implementations available.
-->
<provider>
org.apache.openjpa.persistence.PersistenceProviderImpl
</provider>
<class>org.apache.camel.examples.MultiSteps</class>
<class>org.apache.camel.examples.SendEmail</class>
<properties>
<property name="openjpa.ConnectionURL" value="jdbc:derby:target/derby;create=true"/>
<property name="openjpa.ConnectionDriverName" value="org.apache.derby.jdbc.EmbeddedDriver"/>
<property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
<property name="openjpa.Log" value="DefaultLevel=WARN, Tool=INFO, SQL=TRACE"/>
</properties>
</persistence-unit>
</persistence>The To make a JPA provider available to an application, simply add the provider's JAR file to the class path and the JPA layer will auto-detect the JPA provider. | |
Use the | |
Use the |
The following code example shows how the
org.apache.camel.examples.SendEmail class referenced
in Example 2.5 should be annotated to turn
it into a persistent entity bean (so that it is persistible by JPA):
// Java package org.apache.camel.examples; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; /** * Represents a task which is added to the database, then removed from the database when it is consumed * * @version $Revision$ */ @Entity public class SendEmail { private Long id; private String address; public SendEmail() { } public SendEmail(String address) { setAddress(address); } @Override public String toString() { return "SendEmail[id: " + getId() + " address: " + getAddress() + "]"; } @Id @GeneratedValue public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } }
The preceding class has the following JPA annotations:
@javax.persistence.EntitySpecifies that the following class is persistible by the JPA.
@javax.persistence.IdThe following bean property must be used as the primary key (for locating objects of this type in the database).
@javax.persistence.GeneratedValueSpecifies that the primary key values should be automatically generated by the JPA runtime (you can optionally set attributes on this annotation to configure the ID generation algorithm as well).
For the complete list of JPA annotations, see the API for the javax.persistence package.
JMS endpoints create special problems when transactions are enabled. Their behavior is effected by the type of transaction manager in use, the caching level in use, and the message exchange pattern in use.
To enable transactions in a JMS component (thus enabling JMS endpoints to play the role either of a transactional resource or a transactional client), you need to:
set the transacted property
provide the JMS component with a reference to a suitable transaction manager
In addition, you may want to adjust the JMS component's cache level setting. External transaction managers can impact caching performance.
The easiest way to configure a JMS endpoint to participate in transactions is to create a new an instance of a Camel JMS component that has the proper settings. To do so:
Create a bean element that has its
class attribute set to
org.apache.camel.component.jms.JmsComponent.
This bean creates an instance of the Fuse Mediation Router's JMS component.
Set the bean's id attribute to a unique, short,
string.
The id will be used to create route endpoint's that use this JMS component.
Add an empty property child to the bean.
Add a name attribute with the value of
configuration to the property
element.
Add a ref attribute whose value is the
id of a JmsConfiguration bean to the property
element.
The JmsConfiguration bean is used to configure the JMS component.
Create a bean element that has its
class attribute set to
org.apache.camel.component.jms.JmsConfiguration.
This bean creates an instance of the Fuse Mediation Router's JMS component configuration.
Set the bean's id attribute to the value supplied for
the ref attribute in
Step 5.
Add a property child to the bean to configure the
JMS connection factory.
Set the name attribute to
connectionFactory.
Set the ref attribute to the id of a
bean that configures a JMS connection factory.
Add an empty property child to the bean that specifies
the transaction manager the component will use.
Set the name attribute to
transactionManager.
Set the ref attribute to the id of a
bean that configures transaction manager the endpoint will use.
Add an empty property child to the bean that
configures the component to participate in transactions.
Set the name attribute to
transacted.
Set the value attribute to
true.
The transacted property determines if the endpoint can participate in transactions.
Optionally add an empty property child to the bean to
change the default cache level.
Set the name attribute to
cacheLevelName.
Set the value attribute to
to a valid cache level.
The JmsComponent bean's id specifies
the URI prefix used by JMS endpoints that will use the transactional JMS
component. For example, in Example 3.1 the
JmsComponent bean's id equals
jmstx, so endpoint that use the configured JMS component use the
jmstx: prefix.
The JmsConfiguration class supports a large number of other properties,
which are essentially identical to the JMS URI options described in
JMS in EIP Component Reference.
The settings for JMS cache level can impact performance when you are using transactions.
The default cache level is CACHE_AUTO. This default auto detects if an external
transaction manager is in use and sets the cache level as follows:
CACHE_CONSUMER if only local JMS resources are in use
CACHE_NONE if an external transaction manager is in use
This behavior guarantees that there will not be any conflicts between caching and the transaction manager because some XA transaction managers require that caching is disabled. However, this behavior may not produce optimal performance.
If your transaction manager does not require that caching be disabled, you can raise the cache level to improve performance. Consult your transaction manager's documentation to determine what caching level it can support. Then override the default cache level by setting the JMS component's cacheLevelName property to the new cache level.
See JMS in EIP Component Reference for information on setting the cache level of the JMS component.
Example 3.1 shows the configuration of a JMS
component, jmstx that supports Spring transactions. The JMS component is
layered over an embedded instance of Apache ActiveMQ and the transaction manager is an
instance of JmsTransactionManager.
Example 3.1. JMS Transaction Manager Configuration
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> ... <bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent"> <property name="configuration" ref="jmsConfig" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="jmsConnectionFactory"/> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="transacted" value="true"/> <property name="cacheLevelName" value="CACHE_CONNECTION"/> </bean> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://broker1?brokerConfig=xbean:tutorial/activemq.xml"/> </bean> </beans>
To use this JMS component in a route you would use the URI prefix jmstx: as
shown in Example 3.2.
Example 3.2. URI for Using Transacted JMS Endpoint
from("jmstx:queue:rawStockQuotes")
.process(myFormatter)
.to("jmstx:queue:formattedStockQuotes");The type of exchange created by a JMS consumer endpoint depends on the value of
the JMSReplyTo header in the incoming message. If the
JMSReplyTo header is absent from the incoming message, the consumer
endpoint produces exchanges with the InOnly message exchange
pattern (MEP). For example, consider the following route that receives a stream of
stock quotes from the queue, queue:rawStockQuotes, reformats the
incoming messages, and then forwards them to another queue,
queue:formattedStockQuotes.
from("jmstx:queue:rawStockQuotes")
.process(myFormatter)
.to("jmstx:queue:formattedStockQuotes");Routes that process InOnly exchanges can easily be combined
with transactions. In the preceding example, the JMS queues are accessed through the
transactional JMS instance, jmstx (see Configuring the JMS Component). The transaction initiated by the
consumer endpoint, jmstx:queue:rawStockQuotes, ensures that each
incoming message is reliably transmitted to the producer endpoint,
jmstx:queue:formattedStockQuotes.
Typically, a route designed to work for InOnly exchanges does
not work properly for InOut exchanges.
Unfortunately, this leaves the route at the mercy of the external JMS client: if the
client should accidentally set a JMSReplyTo header, the JMS consumer
endpoint will create an InOut exchange, which could lead to
errors in a route that is designed for InOnly exchanges.
To avoid the risk of creating InOut exchanges when they are
not wanted, you can use the disableReplyTo option in the JMS consumer
to enforce the InOnly MEP. For example, the following route is
guaranteed to process all incoming messages as InOnly
exchanges:
from("jmstx:queue:rawStockQuotes?disableReplyTo=true")
.process(myFormatter)
.to("jmstx:queue:formattedStockQuotes");Figure 3.1 shows an overview of a scenario consisting of JMS consumer endpoint feeding into a route that ends with a JMS producer endpoint. This route is designed to process exclusively InOnly exchanges.
Messages coming into the route shown in Figure 3.1 are processed as follows:
When a oneway message (JMSReplyTo header is absent) is polled
by the JMS consumer endpoint, the endpoint starts a transaction,
provisionally takes the message off the incoming
queue, and creates an InOnly exchange object to hold
the message.
After propagating through the route, the InOnly exchange arrives at the JMS producer endpoint, which provisionally writes the exchange to the outgoing queue.
At this point, we have arrived at the end of the transaction scope. If there were no errors (and the transaction is not marked for rollback), the transaction is automatically committed. Upon committing, both of the JMS endpoints send acknowledgement messages to the queues, turning the provisional read and the provisional write into a committed read and a committed write.
Combining InOut mode with transactional JMS endpoints is problematic. In most cases, this mode of operation is fundamentally inconsistent and it is recommended that you refactor your routes to avoid this combination.
In a JMS consumer endpoint, InOut mode is automatically
triggered by the presence of a JMSReplyTo header in an incoming JMS
message. In this case, the endpoint creates an InOut exchange
to hold the incoming message and it will use the JMSReplyTo queue to
send the reply message.
The InOut MEP is fundamentally incompatible with a route containing transactional JMS endpoints. In almost all cases, the route will hang and no reply will ever be sent. To understand why, consider the following route for processing payment requests:
from("jmstx:queue:rawPayments")
.process(inputReformatter)
.to("jmstx:queue:formattedPayments")
.process(outputReformatter);The JMS consumer endpoint, jmstx:queue:rawPayments, polls for
messages, which are expected to have a JMSReplyTo header (for
InOut mode). For each incoming message, a new transaction
is started and an InOut exchange is created. After reformatting
by the inputReformatter processor, the InOut
exchange proceeds to the JMS producer endpoint,
jmstx:queue:formattedPayments, which sends the message and expects
to receive a reply on a temporary queue. This scenario is illustrated by Figure 3.2
The scope of the transaction includes the entire route, the request leg as well as the reply leg. The processing of the route proceeds as expected until the exchange arrives at the JMS producer endpoint, at which point the producer endpoint makes a provisional write to the outgoing request queue. At this point the route hangs: the JMS producer endpoint is waiting to receive a message from the reply queue, but the reply can never be received because the outgoing request message was only provisionally written to the request queue (and is thus invisible to the service at the other end of the queue).
It turns out that this problem is not trivial to solve. When you consider all of the ways that this scenario could fail and how to guarantee transactional integrity in all cases, it would require some substantial changes to the way that Fuse Mediation Router works. Fortunately, there is a simpler way of dealing with request/reply semantics that is already supported by Fuse Mediation Router.
If you want to implement a transactional JMS route that has request/reply semantics, the easiest solution is to refactor your route to avoid using InOut exchanges. The basic idea is that instead of defining a single route that combines a request leg and a reply leg, you should refactor it into two routes: one for the (outbound) request leg and another for the (inbound) reply leg. For example, the payments example could be refactored into two separate routes as follows:
from("jmstx:queue:rawPaymentsIn")
.process(inputReformatter)
.to("jmstx:queue:formattedPaymentsIn");
from("jmstx:queue:formattedPaymentsOut")
.process(outputReformatter)
.to("jmstx:queue:rawPaymentsOut");Instead of a single incoming queue, queue:rawPayments, which uses the
queue from JMSReplyTo for replies, we now have a pair of queues:
queue:rawPaymentsIn, for receiving incoming requests, and
queue:formattedPaymentsOut, for sending outgoing replies. Instead
of a single outgoing queue, queue:formattedPayments, which implicitly
uses a temporary queue for replies, we now have a pair of queues:
queue:formattedPaymentsOut, for forwarding outgoing requests, and
queue:formattedPaymentsIn, for receiving incoming replies. This
scenario is illustrated by Figure 3.3.
There is a special case of a transactional JMS route where you can process InOut exchanges. If you look at the preceding examples, it is clear that the essential cause of deadlock in the route is the presence of JMS producer endpoints that obey request/reply semantics. In contrast to this, if you define a route where the JMS producer endpoints obey oneway semantics (fire-and-forget), deadlock does not occur.
For example, if you want to have a route that records all of the processed
exchanges in a log queue, queue:log, you could define a route like the
following:
from("jmstx:queue:inOutSource")
.to(ExchangePattern.InOnly, "jmstx:queue:log")
.process(myProcessor);The exchanges coming into this route are of InOut type and
both the consumer endpoint, jmstx:queue:inOutSource, and the producer
endpoint, jmstx:queue:log, are transactional. The key to avoiding
deadlock in this case is to force the producer endpoint to operate in oneway mode,
by passing the ExchangePattern.InOnly parameter to the
to() command,
If you are using transactions in your application, you will inevitably also be accessing some persistent resources. Spring provides a variety of APIs to support programmatic access to persistent resources and you might find it helpful to familiarize yourself with these data access APIs. In particular, this chapter describes Spring's JDBC API in some detail.
To provide access to various kinds of persistent storage, Spring encapsulates the relevant API in a template class. The purpose of the template class is to provide a simplifying wrapper layer around each type of storage and to ensure that any required Spring features are integrated cleanly with the persistence layer.
Spring provides the following template classes for data access:
The org.springframework.jms.core.JmsTemplate class is a general-purpose class for managing Java Messaging Service (JMS) connections. One of the main advantages of this class is that it simplifies the JMS synchronous access codes.
To create an instance of a JmsTemplate, you need to supply a
reference to a javax.jms.ConnectionFactory object.
The org.springframework.jdbc.core.JdbcTemplate class is a wrapper around a JDBC data source, enabling you to access a JDBC database using SQL operations.
To create an instance of a JdbcTemplate, you need to supply a
reference to a javax.sql.DataSource object (for example, see JDBC Data Source).
For a detailed discussion of the JdbcTemplate class, see Spring JDBC Template.
The org.springframework.jdbc.core.simple.SimpleJdbcTemplate class is a
convenience wrapper around the JdbcTemplate class. This class has been
pared down so that it includes only the most commonly used template methods and it
has been optimized to exploit Java 5 features.
The org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
class is a convenience wrapper around the JdbcTemplate class,
which enables you to use named parameters instead of the usual ?
placeholders embedded in a SQL statement.
The org.springframework.orm.ibatis.SqlMapClientTemplate class is a
simplifying wrapper around the iBATIS SqlMapClient class.
iBATIS is an Object Relational
Mapper (ORM) that is capable of automatically instantiating Java objects based on a
given SQL database schema.
The org.springframework.orm.hibernate3.HibernateTemplate class provides an
alternative to working with the raw Hibernate 3 session API (based on sessions returned from
SessionFactory.getCurrentSession()).
For Hibernate versions 3.0.1 or later, the Spring documentation recommends
that you use the native Hibernate 3 API instead of the
HibernateTemplate class, because transactional Hibernate access
code can now be coded using the native Hibernate API.
The org.springframework.orm.jdo.JdoTemplate class provides an alternative to working with the raw JDO PersistenceManager API. The main difference between the APIs relates to their exception handling. See the Spring JavaDoc for details.
The org.springframework.orm.jpa.JpaTemplate class provides an alternative to
working with the raw JPA
EntityManager API..
The Spring documentation now recommends that you use the native JPA
programming interface instead of the JpaTemplate class. Considering
that the JPA programming interface is itself a thin wrapper layer, there is
little advantage to be had by adding another wrapper layer on top of it.
This section describes how to access a database through the Spring
JdbcTemplate class and provides a code example that shows how to
use the JdbcTemplate class in practice.
The org.springframework.jdbc.core.JdbcTemplate class is the key class for
accessing databases through JDBC in Spring. It provides a complete API for executing
SQL statements on the database at run time. The following kinds of SQL operations
are supported by JdbcTemplate:
Querying (SELECT operations).
Updating (INSERT, UPDATE, and DELETE
operations).
Other SQL operations (all other SQL operations).
The JdbcTemplate query methods are used to send SELECT queries
to the database. A variety of different query methods are supported, depending on
how complicated the return values are.
The simplest case is where you expect the query to return a single value from a
single row. In this case, you can use a type-specific query method to retrieve the
single value. For example, if you want to retrieve the balance of a particular
customer's account from the accounts table, you could use the following
code:
// Java
int origAmount = jdbc.queryForInt(
"select amount from accounts where name = ?",
new Object[]{name}
);The arguments to the SQL query are provided as a static array of objects,
Object[]{name}. In this example, the name string is
bound to the question mark, ?, in the SQL query string. If there are
multiple arguments to the query string (where each argument in the SQL string is
represented by a question mark, ?), you would provide an object array
with multiple arguments—for example,
Object[]{arg1,arg2,arg3,...}.
The next most complicated case is where you expect the query to return multiple
values from a single row. In this case, you can use one of the
queryForMap() methods to retrieve the contents of a single row. For
example, to retrieve the complete account details from a single customer:
// Java
Map<String,Object> rowMap = jdbc.queryForMap(
"select * from accounts where name = ?",
new Object[]{name}
);Where the returned map object, rowMap, contains one entry for each
column, using the column name as the key.
The most general case is where you expect the query to return multiple values from
multiple rows. In this case, you can use one of the queryForList()
methods to return the contents of multiple rows. For example, to return all of the
rows from the accounts table:
// Java
List<Map<String,Object> > rows = jdbc.queryForList(
"select * from accounts"
);In some cases, a more convenient way of returning the table rows is to provide a
RowMapper, which automatically converts each row to a Java object.
The return value of a query call would then be a list of Java objects. For example,
the contents of the accounts table could be returned as follows:
// Java
List<Account> accountList = jdbc.query(
"select * from accounts",
new Object[]{},
new RowMapper() {
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
Account acc = new Account();
acc.setName(rs.getString("name"));
acc.setAmount(rs.getLong("amount"));
return acc;
}
}
);Where each Account object in the returned list encapsulates the
contents of a single row.
The JdbcTemplate update methods are used to perform INSERT,
UPDATE, or DELETE operations on the database. The
update methods modify the database contents, but do not return any data from the
database (apart from an integer return value, which counts the number of rows
affected by the operation).
For example, the following update operation shows how to set the
amount field in a customer's account:
// Java
jdbc.update(
"update accounts set amount = ? where name = ?",
new Object[] {newAmount, name}
);For all other SQL operations, there is a general purpose execute()
method. For example, you would use this method to execute a create
table statement, as follows:
// Java
jdbc.execute("create table accounts (name varchar(50), amount int)");To illustrate the database operations you can perform through the
JdbcTemplate class, consider the account
service, which provides access to bank account data stored in a
database. It is assumed that the database is accessible through a JDBC data source
and the account service is implemented by an AccountService class that
exposes the following methods:
credit()—add a specific amount of money to a named
account.
debit()—subtract a specific amount of money from a
named account.
By combining credit and debit operations, it is possible to model money transfers, which can also be used to demonstrate key properties of transaction processing.
For the account service example, the money transfer orders have a simple XML format, as follows:
<transaction>
<transfer>
<sender>Major Clanger</sender>
<receiver>Tiny Clanger</receiver>
<amount>90</amount>
</transfer>
</transaction>When this money transfer order is executed, the amount of money specified in the
amount element is debited from the sender account and
credited to the receiver account.
Before we can start performing any queries on the database, the first thing we
need to do is to create an accounts table and populate it with some
initial values. Example 4.1 shows the definition of the
CreateTable class, which is responsible for intializing
the accounts table.
Example 4.1. The CreateTable Class
// Java
package com.fusesource.demo.tx.jdbc.java;
import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;
public class CreateTable {
private static Logger log = Logger.getLogger(CreateTable.class);
protected DataSource dataSource;
protected JdbcTemplate jdbc;
public DataSource getDataSource() {
return dataSource;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public CreateTable(DataSource ds) {
log.info("CreateTable constructor called");
setDataSource(ds);
setUpTable();
}
public void setUpTable() {
log.info("About to set up table...");
jdbc = new JdbcTemplate(dataSource);
jdbc.execute("create table accounts (name varchar(50), amount int)");
jdbc.update("insert into accounts (name,amount) values (?,?)",
new Object[] {"Major Clanger", 2000}
);
jdbc.update("insert into accounts (name,amount) values (?,?)",
new Object[] {"Tiny Clanger", 100}
);
log.info("Table created");
}
}Where the accounts table consists of two columns: name,
a string value that records the account holder's name, and amount, a
long integer that records the amount of money in the account. Because this example
uses an ephemeral database, which exists only temporarily in memory, it is necessary
to re-initialize the database every time the example runs. A convenient way to
initialize the table is by instantiating a CreateTable bean in the
Spring XML configuration, as follows:
<beans ...>
<!-- datasource to the database -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<property name="driverClass" value="org.hsqldb.jdbcDriver"/>
<property name="url" value="jdbc:hsqldb:mem:camel"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
</bean>
<!-- Bean to initialize table in the DB -->
<bean id="createTable" class="com.fusesource.demo.tx.jdbc.java.CreateTable">
<constructor-arg ref="dataSource" />
</bean>
...
</beans>As soon as the createTable bean is instantiated, the
accounts table is ready for use. Note that a reference to the JDBC
data source, dataSource, is passed to the CreateTable()
constructor, because the data source is needed to create a JdbcTemplate
instance.
Example 4.2 shows an outline of the
AccountService class, not including
the service methods that access the database. The class expects to receive a data
source reference through dependency injection, which it then uses to create a
JdbcTemplate instance.
Example 4.2. The AccountService class
package com.fusesource.demo.tx.jdbc.java;
import java.util.List;
import javax.sql.DataSource;
import org.apache.camel.Exchange;
import org.apache.camel.language.XPath;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;
public class AccountService {
private static Logger log = Logger.getLogger(AccountService.class);
private JdbcTemplate jdbc;
public AccountService() {
}
public void setDataSource(DataSource ds) {
jdbc = new JdbcTemplate(ds);
}
...
// Service methods (see below)
...
}You can conveniently instantiate an AccountService bean in Spring
XML, using dependency injection to pass the data source reference, as
follows:
<beans ...>
<!-- Bean for account service -->
<bean id="accountService" class="com.fusesource.demo.tx.jdbc.java.AccountService">
<property name="dataSource" ref="dataSource"/>
</bean>
...
</beans>The credit() method adds the specified amount of money,
amount, to the specified account, name in the
accounts database table, as follows:
public void credit(
@XPath("/transaction/transfer/receiver/text()") String name,
@XPath("/transaction/transfer/amount/text()") String amount
)
{
log.info("credit() called with args name = " + name + " and amount = " + amount);
int origAmount = jdbc.queryForInt(
"select amount from accounts where name = ?",
new Object[]{name}
);
int newAmount = origAmount + Integer.parseInt(amount);
jdbc.update(
"update accounts set amount = ? where name = ?",
new Object[] {newAmount, name}
);
}For methods invoked using the For example, the first XPath expression,
| |
The | |
The |
The debit() method subtracts the specified amount of money,
amount, from the specified account, name in the
accounts database table, as follows:
public void debit(
@XPath("/transaction/transfer/sender/text()") String name,
@XPath("/transaction/transfer/amount/text()") String amount
)
{
log.info("debit() called with args name = " + name + " and amount = " + amount);
int iamount = Integer.parseInt(amount);
if (iamount > 100) {
throw new IllegalArgumentException("Debit limit is 100");
}
int origAmount = jdbc.queryForInt(
"select amount from accounts where name = ?",
new Object[]{name}
);
int newAmount = origAmount - Integer.parseInt(amount);
if (newAmount < 0) {
throw new IllegalArgumentException("Not enough in account");
}
jdbc.update(
"update accounts set amount = ? where name = ?",
new Object[] {newAmount, name}
);
}The parameters of the | |
There is a fixed debit limit of 100. Amounts greater than this will
trigger an | |
If the balance of the account would go below zero after debiting, abort
the transaction by calling the |
The dumpTable() method is convenient for testing. It simply returns
the entire contents of the accounts table as a string. It is
implemented as follows:
public void dumpTable(Exchange ex) {
log.info("dump() called");
List<?> dump = jdbc.queryForList("select * from accounts");
ex.getIn().setBody(dump.toString());
}Transaction demarcation refers to the procedures for starting, committing, and rolling back transactions. This chapter describes the mechanisms that are available for controlling transaction demarcation, both by programming and by configuration.
Fuse Mediation Router provides a simple mechanism for initiating a transaction in a route, by
inserting the transacted() command in the Java DSL or by inserting the
<transacted/> tag in the XML DSL.
Figure 5.1 shows an example of a route that
is made transactional by adding the transacted() DSL command to the
route. All of the route nodes following the transacted() node are
included in the transaction scope. In this example, the two following nodes access a
JDBC resource.
The transacted processor demarcates transactions as follows: when an
exchange enters the transacted processor, the transacted
processor invokes the default transaction manager to begin a
transaction (attaching it to the current thread); when the exchange reaches the end
of the remaining route, the transacted processor invokes the
transaction manager to commit the current transaction.
The following Java DSL example shows how to define a transactional route by
marking the route with the transacted() DSL command:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.beanRef("accountService","debit");
}
}In this example, the file endpoint reads some files in XML format
that describe a transfer of funds from one account to another. The first
beanRef() invocation credits the specified sum of money to the
beneficiary's account and then the second beanRef() invocation
subtracts the specified sum of money from the sender's account. Both of the
beanRef() invocations cause updates to be made to a database
resource, which we are assuming is bound to the transaction through the transaction
manager (for example, see JDBC Data Source). For a
sample implementation of the accountService bean, see Spring JDBC Template.
The beanRef() Java DSL command is available only
in the SpringRouteBuilder class. It enables you to reference a bean by
specifying the bean's Spring registry ID (for example, accountService).
If you do not use the beanRef() command, you could inherit from the
org.apache.camel.builder.RouteBuilder class instead.
The preceding route can equivalently be expressed in Spring XML, where the
<transacted/> tag is used to mark the route as transactional, as
follows:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ... >
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="file:src/data?noop=true"/>
<transacted/>
<bean ref="accountService" method="credit"/>
<bean ref="accountService" method="debit"/>
</route>
</camelContext>
</beans>To demarcate transactions, the transacted processor must be
associated with a particular transaction manager instance. To save you having to
specify the transaction manager every time you invoke transacted(), the
transacted processor automatically picks a sensible default. For
example, if there is only one instance of a transaction manager in your Spring
configuration, the transacted processor implicitly picks this
transaction manager and uses it to demarcate transactions.
A transacted processor can also be configured with a transacted
policy, of TransactedPolicy type, which encapsulates a propagation
policy and a transaction manager (see Propagation Policies
for details). The following rules are used to pick the default transaction manager
or transaction policy:
If there is only one bean of
org.apache.camel.spi.TransactedPolicy type, use this
bean.
The TransactedPolicy type is a base type of the
SpringTransactionPolicy type that is described in Propagation Policies. Hence, the bean referred to
here could be a SpringTransactionPolicy bean.
If there is a bean of type,
org.apache.camel.spi.TransactedPolicy, which has the ID,
PROPAGATION_REQUIRED, use this bean.
If there is only one bean of
org.springframework.transaction.PlatformTransactionManager
type, use this bean.
You also have the option of specifying a bean explicitly by providing the bean ID
as an argument to transacted()—see Sample route with PROPAGATION_NEVER policy in Java DSL.
If you insert a transacted processor into a route, a new transaction
is created each time an exchange passes through this node and the transaction's
scope is defined as follows:
The transaction is associated with the current thread only.
The transaction scope encompasses all of the route nodes
following the transacted processor.
In particular, all of the route nodes preceding the
transacted processor are not included in the transaction (but the
situation is different, if the route begins with a transactional endpoint—see
Demarcation by Transactional Endpoints). For example, the following route
is incorrect, because the transacted() DSL command mistakenly appears
after the first beanRef() call (which accesses the database
resource):
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.beanRef("accountService","credit")
.transacted() // <-- WARNING: Transaction started in the wrong place!
.beanRef("accountService","debit");
}
}It is crucial to understand that a given transaction is associated with the current thread only. It follows that you must not create a thread pool in the middle of a transactional route, because the processing in the new threads will not participate in the current transaction. For example, the following route is bound to cause problems:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.transacted()
.threads(3) // WARNING: Subthreads are not in transaction scope!
.beanRef("accountService","credit")
.beanRef("accountService","debit");
}
}A route like the preceding one is certain to corrupt your database, because the
threads() DSL command is incompatible with transacted routes. Even
if the threads() call precedes the transacted() call, the
route will not behave as expected.
If you want to break a route into fragments and have each route fragment
participate in the current transaction, you can use direct: endpoints.
For example, to send exchanges to separate route fragments, depending on whether the
transfer amount is big (greater than 100) or small (less than or equal to 100), you
can use the choice() DSL command and direct endpoints, as
follows:
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.choice().when(xpath("/transaction/transfer[amount > 100]"))
.to("direct:txbig")
.otherwise()
.to("direct:txsmall");
from("direct:txbig")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
.to("file:target/messages/big");
from("direct:txsmall")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
.to("file:target/messages/small");
}
}Both the fragment beginning with direct:txbig and the fragment
beginning with direct:txsmall participate in the current transaction,
because the direct endpoints are synchronous. This
means that the fragments execute in the same thread as the first route fragment and,
therefore, they are included in the same transaction scope.
You must not use seda endpoints to join the route fragments,
because seda consumer endpoints create a new thread (or threads) to
execute the route fragment (asynchronous processing). Hence, the fragments would
not participate in the original transaction.
The following Fuse Mediation Router components act as resource endpoints
when they appear as the destination of a route (for example, if they appear in the
to() DSL command). That is, these endpoints can access a
transactional resource, such as a database or a persistent queue. The resource
endpoints can participate in the current transaction, as long as they are associated
with the same transaction manager as the
transacted processor that initiated the current transaction. If you
need to access multiple resources, you must deploy your application in a J2EE
container, which gives you access to a global transaction manager.
JMS in EIP Component Reference
ActiveMQ in EIP Component Reference
AMQP in EIP Component Reference
JavaSpace in EIP Component Reference
JPA in EIP Component Reference
Hibernate in EIP Component Reference
iBatis in EIP Component Reference
JBI in EIP Component Reference
JCR in EIP Component Reference
JDBC in EIP Component Reference
LDAP in EIP Component Reference
For example, the following route sends the order for a money transfer to two
different JMS queues: the credits queue processes the order to credit
the receiver's account; and the debits queue processes the order to
debit the sender's account. Since there must only be a credit, if
there is a corresponding debit, it makes sense to enclose the enqueueing operations
in a single transaction. If the transaction succeeds, both the credit order and the
debit order will be enqueued, but if an error occurs, neither
order will be enqueued.
from("file:src/data?noop=true")
.transacted()
.to("jmstx:queue:credits")
.to("jmstx:queue:debits");If a consumer endpoint at the start of a route accesses a resource, the
transacted() command is of no use, because it initiates the
transaction after an exchange is polled. In other words, the
transaction starts too late to include the consumer endpoint within the transaction
scope. The correct approach in this case is to make the endpoint itself responsible
for initiating the transaction. An endpoint that is capable of managing transactions
is known as a transactional endpoint.
Figure 5.2 shows an example of a route
that is made transactional by the presence of a transactional endpoint at the start
of the route (in the from() command). All of the route nodes are
included in the transaction scope. In this example, all of the endpoints in the
route access a JMS resource.
There are two different models of demarcation by transactional endpoint, as follows:
General case—normally, a transactional endpoint demarcates transactions as follows: when an exchange arrives at the endpoint (or when the endpoint successfully polls for an exchange), the endpoint invokes its associated transaction manager to begin a transaction (attaching it to the current thread); and when the exchange reaches the end of the route, the transactional endpoint invokes the transaction manager to commit the current transaction.
JMS endpoint with InOut exchange—when a JMS consumer endpoint receives an InOut exchange and this exchange is routed to another JMS endpoint, this must be treated as a special case. The problem is that the route can deadlock, if you try to enclose the entire request/reply exchange in a single transaction. For details of how to resolve this problem, see InOut Message Exchange Pattern.
The following Java DSL example shows how to define a transactional route by starting the route with a transactional endpoint:
from("jmstx:queue:giro")
.to("jmstx:queue:credits")
.to("jmstx:queue:debits");Where the transaction scope encompasses the endpoints,
jmstx:queue:giro, jmstx:queue:credits, and
jmstx:queue:debits. If the transaction succeeds, the exchange is
permanently removed from the giro queue and pushed on to the
credits queue and the debits queue; if the transaction
fails, the exchange does not get put on to the
credits and debits queues and the exchange is pushed
back on to the giro queue (by default, JMS will automatically attempt
to redeliver the message).
The JMS component bean, jmstx, must be explicitly configured to use
transactions, as follows:
<beans ...>
<bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
</bean>
...
</beans>Where the transaction manager instance, jmsTransactionManager, is
associated with the JMS component and the transacted property is set to
true to enable transaction demarcation for
InOnly exchanges. For the complete Spring XML configuration
of this component, see Example 3.1.
The preceding route can equivalently be expressed in Spring XML, as follows:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ... >
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="jmstx:queue:giro"/>
<to uri="jmstx:queue:credits"/>
<to uri="jmstx:queue:debits"/>
</route>
</camelContext>
</beans>The transacted() DSL command is not required in
a route that starts with a transactional endpoint. Nevertheless, assuming that the
default transaction policy is PROPAGATION_REQUIRED (see Propagation Policies), it is usually harmless to include the
transacted() command, as in this example:
from("jmstx:queue:giro")
.transacted()
.to("jmstx:queue:credits")
.to("jmstx:queue:debits");However, it is possible for this route to behave in unexpected ways—for
example, if a single TransactedPolicy bean having a non-default
propagation policy is created in Spring XML (see Default transaction manager and transacted policy)—so it is generally
better not to include the transacted() DSL command
in routes that start with a transactional endpoint.
The following Fuse Mediation Router components act as transactional
endpoints when they appear at the start of a route (for example, if
they appear in the from() DSL command). That is, these endpoints can be
configured to behave as a transactional client and they can also access a
transactional resource.
If you want to influence the way a transactional client creates new transactions, you can do so by specifying a transaction policy for it. In particular, Spring transaction policies enable you to specify a propagation behavior for your transaction. For example, if a transactional client is about to create a new transaction and it detects that a transaction is already associated with the current thread, should it go ahead and create a new transaction, suspending the old one? Or should it simply let the existing transaction take over? These kinds of behavior are regulated by specifying the propagation behavior on a transaction policy.
Transaction policies are instantiated as beans in Spring XML. You can then
reference a transaction policy by providing its bean ID as an argument to the
transacted() DSL command. For example, if you want to initiate
transactions subject to the behavior, PROPAGATION_REQUIRES_NEW, you
could use the following route:
from("file:src/data?noop=true")
.transacted("PROPAGATION_REQUIRES_NEW")
.beanRef("accountService","credit")
.beanRef("accountService","debit")
.to("file:target/messages"); Where the PROPAGATION_REQUIRES_NEW argument specifies the bean ID of
a transaction policy bean that is configured with the
PROPAGATION_REQUIRES_NEW behavior (see Example 5.1).
Fuse Mediation Router lets you define Spring transaction policies using the
org.apache.camel.spring.spi.SpringTransactionPolicy class (which is
essentially a wrapper around a native Spring class). The
SpringTransactionPolicy class encapsulates two pieces of
data:
A reference to a transaction manager (of
PlatformTransactionManager type).
A propagation behavior.
For example, you could instantiate a Spring transaction policy bean with
PROPAGATION_MANDATORY behavior, as follows:
<beans ...>
<bean id="PROPAGATION_MANDATORY "class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_MANDATORY"/>
</bean>
...
</beans>The following propagation behaviors are supported by Spring (where these values were originally modelled on the propagation behaviors supported by J2EE):
PROPAGATION_MANDATORYSupport a current transaction; throw an exception if no current transaction exists.
PROPAGATION_NESTEDExecute within a nested transaction if a current transaction exists,
else behave like PROPAGATION_REQUIRED.
Nested transactions are not supported by all transaction managers.
PROPAGATION_NEVERDo not support a current transaction; throw an exception if a current transaction exists.
PROPAGATION_NOT_SUPPORTEDDo not support a current transaction; rather always execute non-transactionally.
This policy requires the current transaction to be suspended, a feature which is not supported by all transaction managers.
PROPAGATION_REQUIRED(Default) Support a current transaction; create a new one if none exists.
PROPAGATION_REQUIRES_NEWCreate a new transaction, suspending the current transaction if one exists.
Suspending transactions is not supported by all transaction managers.
PROPAGATION_SUPPORTSSupport a current transaction; execute non-transactionally if none exists.
Example 5.1 shows how to define transaction policy beans for all of the supported propagation behaviors. For convenience, each of the bean IDs matches the specified value of the propagation behavior value, but in practice you can use whatever value you like for the bean IDs.
Example 5.1. Transaction Policy Beans
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
...
<bean id="PROPAGATION_MANDATORY "class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_MANDATORY"/>
</bean>
<bean id="PROPAGATION_NESTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_NESTED"/>
</bean>
<bean id="PROPAGATION_NEVER" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_NEVER"/>
</bean>
<bean id="PROPAGATION_NOT_SUPPORTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/>
</bean>
<!-- This is the default behavior. -->
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
</bean>
<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>
<bean id="PROPAGATION_SUPPORTS" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="txManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_SUPPORTS"/>
</bean>
</beans>If you want to paste any of these bean definitions into your own Spring XML
configuration, remember to customize the references to the transaction manager.
That is, replace references to txManager with the actual ID of your
transaction manager bean.
A simple way of demonstrating that transaction policies have some effect on a
transaction is to insert a PROPAGATION_NEVER policy into the middle of
an existing transaction, as shown in the following route:
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.transacted("PROPAGATION_NEVER")
.beanRef("accountService","debit");Used in this way, the PROPAGATION_NEVER policy inevitably aborts
every transaction, leading to a transaction rollback. You should easily be able to
see the effect of this on your application.
Remember that the string value passed to transacted() is a bean
ID, not a propagation behavior name. In this example, the
bean ID is chosen to be the same as a propagation behavior name, but this need
not always be the case. For example, if your application uses more than one
transaction manager, you might end up with more than one policy bean having a
particular propagation behavior. In this case, you could not simply name the
beans after the propagation behavior.
The preceding route can be also be defined in Spring XML, as follows:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ... >
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="file:src/data?noop=true"/>
<transacted/>
<bean ref="accountService" method="credit"/>
<transacted ref="PROPAGATION_NEVER"/>
<bean ref="accountService" method="debit"/>
</route>
</camelContext>
</beans>While you can use standard Fuse Mediation Router error handling techniques in a transactional route, it is important to understand the interaction between exceptions and transaction demarcation. In particular, you need to bear in mind that thrown exceptions usually cause transaction rollback.
You can use one of the following approaches to roll back a transaction:
The most common way to roll back a Spring transaction is to throw a
runtime (unchecked) exception—that is, where the exception is
an instance or subclass of java.lang.RuntimeException. Java errors, of
java.lang.Error type, also trigger transaction rollback. Checked
exceptions, on the other hand, do not trigger rollback. Figure 5.3 summarises how Java errors and
exceptions affect transactions, where the classes that trigger rollback are shaded
gray.
The Spring framework also provides a system of XML annotations that enable you to specify which exceptions should or should not trigger rollbacks. For details, see Rolling back in the Spring Reference Guide.
If a runtime exception is handled within the transaction (that is, before the exception has the chance to percolate up to the code that does the transaction demarcation), the transaction will not be rolled back. See How to define a dead letter queue for details.
If you want to trigger a rollback in the middle of a transacted route, you can do
this by calling the rollback() DSL command, which throws an
org.apache.camel.RollbackExchangeException exception. In other
words, the rollback() command uses the standard approach of throwing a
runtime exception to trigger the rollback.
For example, if you decide that there should be an absolute limit on the size of money transfers in the account services application, you could trigger a rollback when the amount exceeds 100, using the following code:
Example 5.2. Rolling Back an Exception with rollback()
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.choice().when(xpath("/transaction/transfer[amount > 100]"))
.rollback()
.otherwise()
.to("direct:txsmall");
from("direct:txsmall")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
.to("file:target/messages/small");If you trigger a rollback in the preceding route, it will get trapped in an
infinite loop. The reason for this is that the
RollbackExchangeException exception thrown by
rollback() propagates back to the file endpoint at the start of
the route. The File component has a built-in reliability feature that causes it
to resend any exchange for which an exception has been thrown. Upon resending,
of course, the exchange just triggers another rollback, leading to an infinite
loop.
The markRollbackOnly() DSL command enables you to force the current
transaction to roll back, without throwing an exception. This
can be useful in cases where (as in Example 5.2) throwing an exception has
unwanted side effects.
For example, Example 5.3 shows how to
modify Example 5.2 by replacing
rollback() with markRollbackOnly(). This version of
the route solves the problem of the infinite loop. In this case, when the amount of
the money transfer exceeds 100, the current transaction is rolled back, but no
exception is thrown. Because the file endpoint does not receive an exception, it
does not retry the exchange, and the failed transactions is quietly
discarded.
Example 5.3. Rolling Back an Exception with markRollbackOnly()
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.choice().when(xpath("/transaction/transfer[amount > 100]"))
.markRollbackOnly()
.otherwise()
.to("direct:txsmall");
...The preceding route implementation is not ideal, however. Although the route cleanly rolls back the transaction (leaving the database in a consistent state) and avoids the pitfall of infinite looping, it does not keep any record of the failed transaction. In a real-world application, you would typically want to keep track of any failed transaction. For example, you might want to write a letter to the relevant customer in order to explain why the transaction did not succeed. A convenient way of tracking failed transactions is to add a dead-letter queue to the route.
In order to keep track of failed transactions, you can define an
onException() clause, which enables you to divert the relevant
exchange object to a dead-letter queue. When used in the context of transactions,
however, you need to be careful about how you define the onException()
clause, because of potential interactions between exception handling and transaction
handling. Example 5.4 shows the correct
way to define an onException() clause, assuming that you need to
suppress the rethrown exception.
Example 5.4. How to Define a Dead Letter Queue
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
onException(IllegalArgumentException.class)
.maximumRedeliveries(1)
.handled(true)
.to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
.markRollbackOnly(); // NB: Must come *after* the dead letter endpoint.
from("file:src/data?noop=true")
.transacted()
.beanRef("accountService","credit")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
.to("file:target/messages");
}
}In the preceding example, onException() is configured to catch the
IllegalArgumentException exception and send the offending exchange
to a dead letter file, deadLetters.xml (of course, you can change this
definition to catch whatever kind of exception arises in your application). The
exception rethrow behavior and the transaction rollback behavior are controlled by
the following special settings in the onException() clause:
handled(true)—suppress the rethrown exception. In this
particular example, the rethrown exception is undesirable because it
triggers an infinite loop when it propagates back to the file endpoint (see
The markRollbackOnly() DSL command). In some
cases, however, it might be acceptable to rethrow the exception (for
example, if the endpoint at the start of the route does
not implement a retry feature).
markRollbackOnly()—marks the current transaction for
rollback without throwing an exception. Note that it is
essential to insert this DSL command after the
to() command that routes the exchange to the dead letter
queue. Otherwise, the exchange would never reach the dead letter queue,
because markRollbackOnly() interrupts the chain of
processing.
Instead of using onException(), a simple approach to handling
exceptions in a transactional route is to use the doTry() and
doCatch() clauses around the route. For example, Example 5.5 shows how you can catch and
handle the IllegalArgumentException in a transactional route, without
the risk of getting trapped in an infinite loop.
Example 5.5. Catching Exceptions with doTry() and doCatch()
// Java
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
...
public void configure() {
from("file:src/data?noop=true")
.doTry()
.to("direct:split")
.doCatch(IllegalArgumentException.class)
.to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
.end();
from("direct:split")
.transacted()
.beanRef("accountService","credit")
.beanRef("accountService","debit")
.beanRef("accountService","dumpTable")
.to("file:target/messages");
}
}In this example, the route is split into two segments. The first segment (from the
file:src/data endpoint) receives the incoming exchanges and
performs the exception handling using doTry() and
doCatch(). The second segment (from the direct:split
endpoint) does all of the transactional work. If an exception occurs within this
transactional segment, it propagates first of all to the transacted()
command, causing the current transaction to be rolled back, and it is then caught by
the doCatch() clause in the first route segment. The
doCatch() clause does not rethrow the
exception, so the file endpoint does not do any retries and infinite looping is
avoided.