LibraryPrintFeedback

EIP Transaction Guide

Version 7.1

December 2012
Trademark Disclaimer
Third Party Acknowledgements

Updated: 08 Jan 2014

Table of Contents

1. Introduction to Transactions
Basic Transaction Concepts
Transaction Qualities of Service
Getting Started with Transactions
Prerequisites
Generate a New Project
Configure a Transaction Manager and a Resource
Define a Route
Build and Run the Example
2. Selecting a Transaction Manager
What is a Transaction Manager?
Spring Transaction Architecture
OSGi Transaction Architecture
PlatformTransactionManager Interface
Transaction Manager Implementations
Sample Configurations
JDBC Data Source
Hibernate
JPA
3. JMS Transactions
Configuring the JMS Component
InOnly Message Exchange Pattern
InOut Message Exchange Pattern
4. Data Access with Spring
Programming Data Access with Spring Templates
Spring JDBC Template
5. Transaction Demarcation
Demarcation by Marking the Route
Demarcation by Transactional Endpoints
Propagation Policies
Error Handling and Rollbacks
6. XA Transactions in Fuse ESB Enterprise
Transaction Architecture
Configuring the Transaction Manager
Accessing the Transaction Manager
Java Transaction API
The XA Enlistment Problem
7. JMS XA Transaction Integration
Enabling XA on the Camel JMS Component
JMS XA Resource
Sample JMS XA Configuration
8. JDBC XA Transaction Integration
Configuring an XA Data Source
Apache Aries Auto-Enlisting XA Wrapper
9. XA Transaction Demarcation
Demarcation by Transactional Endpoints
Demarcation by Marking the Route
Demarcation by UserTransaction
Demarcation by Declarative Transactions
10. XA Tutorial
Install Apache Derby
Integrate Derby with Fuse ESB Enterprise
Define a Derby Datasource
Define a Transactional Route
Deploy and Run the Transactional Route
A. AccountService Example
Index

List of Figures

2.1. Spring Transaction Architecture
2.2. OSGi Transaction Architecture
3.1. Transactional JMS Route that Processes InOnly Exchanges
3.2. Transactional JMS Route that Processes InOut Exchanges
3.3. Pair of Transactional JMS Routes that Support Request/Reply Semantics
5.1. Demarcation by Marking the Route
5.2. Demarcation by Transactional Endpoints
5.3. Errors and Exceptions that Trigger Rollback
6.1. OSGi Transaction Architecture
7.1. Camel JMS Component Integrated with XA Transactions
7.2. JMS XA Resource Connected to Remote Broker
7.3. JMS XA Resource Connected to Embedded Broker
8.1. Creating the Auto-Enlisting XA Wrapper
10.1. JMX Operations Supported by the statusLog Queue
10.2. JMX Browsing the statusLog Queue

List of Tables

2.1. Local Transaction Managers
2.2. Global Transaction Managers
2.3. Standalone Data Source Classes
2.4. J2EE Data Source Adapters
2.5. Data Source Proxies
2.6. Connection Details for Various Databases
2.7. Obtaining JPA Entity Manager Factory
8.1. Standard DataSource Properties
8.2. Derby DataSource Properties

List of Examples

2.1. The PlatformTransactionManager Interface
2.2. Data Source Transaction Manager Configuration
2.3. Hibernate Transaction Manager Configuration
2.4. JPA Transaction Manager Configuration
2.5. Sample persistence.xml File
3.1. JMS Transaction Manager Configuration
3.2. URI for Using Transacted JMS Endpoint
4.1. The CreateTable Class
4.2. The AccountService class
5.1. Transaction Policy Beans
5.2. Rolling Back an Exception with rollback()
5.3. Rolling Back an Exception with markRollbackOnly()
5.4. How to Define a Dead Letter Queue
5.5. Catching Exceptions with doTry() and doCatch()
7.1. Embedded Broker Configured in Spring XML
7.2. Camel JMS Component with XA Enabled
8.1. Exposing XA DataSource as an OSGi Service in Spring XML
8.2. Exposing XA DataSource as an OSGi Service in Blueprint XML
8.3. Importing XA DataSource as an OSGi Service Reference in Spring XML
8.4. Importing XA DataSource as an OSGi Service Reference in Blueprint XML
A.1. The AccountService Class

To understand transaction processing, it is crucial to appreciate the basic relationship between transactions and threads: transactions are thread-specific. That is, when a transaction is started, it is attached to a specific thread (technically, a transaction context object is created and associated with the current thread). From this point on (until the transaction ends), all of the activity in the thread occurs within this transaction scope. Conversely, activity in any other thread does not fall within this transaction's scope (although it might fall within the scope of some other transaction).

From this, we can draw a few simple conclusions:

[Note]Note

Some advanced transaction manager implementations give you the freedom to detach and attach transaction contexts to and from threads at will. For example, this makes it possible to move a transaction context from one thread to another thread. In some cases it is also possible to attach a single transaction context to multiple threads.

The X/Open XA standard describes a standardized interface for integrating resources with a transaction manager. If you want to manage a transaction that includes more than one resource, it is essential that the participating resources support the XA standard. Resources that support the XA standard expose a special object, the XA switch, which enables transaction managers (or TP monitors) to take control of their transactions. The XA standard supports both the 1-phase commit protocol and the 2-phase commit protocol.

ANSI SQL defines four transaction isolation levels, as follows:

Databases generally do not support all of the different transaction isolation levels. For example, some free databases support only READ_UNCOMMITTED. Also, some databases implement transaction isolation levels in ways that are subtly different from the ANSI standard. Isolation is a complicated issue, which involves trade offs with database performance (for example, see Isolation in Wikipedia).

In order for a resource to participate in a transaction involving multiple resources, it needs to support the X/Open XA standard. You also need to check whether the resource's implementation of the XA standard is subject to any special restrictions. For example, some implementations of the XA standard are restricted to a single database connection (which implies that only one thread at a time can process a transaction involving that resource).

A key differentiator for transaction managers is the ability to support multiple resources. This normally entails support for the XA standard, where the transaction manager provides a way for resources to register their XA switches.

[Note]Note

Strictly speaking, the XA standard is not the only approach you can use to support multiple resources, but it is the most practical one. The alternative typically involves writing tedious (and critical) custom code to implement the algorithms normally provided by an XA switch.

To generate the new project, perform the following steps:

  1. Open a new command window and change to the directory where you want to store the new Maven project.

  2. Enter the following command to generate the new Maven project:

    mvn archetype:generate
      -DarchetypeRepository=http://repo.fusesource.com/maven2
      -DarchetypeGroupId=org.apache.camel.archetypes
      -DarchetypeArtifactId=camel-archetype-java
      -DarchetypeVersion=2.10.0-fuse-00-05
      -DgroupId=tutorial
      -DartifactId=tx-jms-router

    This command generates a basic router application under the tx-jms-router directory. You will customize this basic application to demonstrate transactions in .Apache Camel

    [Note]Note

    Maven accesses the Internet to download JARs and stores them in its local repository.

  3. Customize the project POM file, tx-jms-router/pom.xml, by adding some new project dependencies. First of all, define some properties for the dependency versions. Using your favorite text editor, open the POM file and add a spring-version property and an activemq-version property as follows:

    <project ...>
      ...
      <properties>
        ...
        <spring-version>3.0.5.RELEASE</spring-version>
        <activemq-version>5.5.1-fuse-00-05</activemq-version>
        <xbean-version>3.7</xbean-version>
      </properties>
      ...
    </project>
  4. Add dependencies on the artifacts that implement Spring transactions. Look for the dependencies element in the POM file and add the following dependency elements:

    <project ...>
      ...
      <dependencies>
        ...
        <!-- Spring transaction dependencies -->
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-core</artifactId>
          <version>${spring-version}</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-context</artifactId>
          <version>${spring-version}</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-tx</artifactId>
          <version>${spring-version}</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-aop</artifactId>
          <version>${spring-version}</version>
        </dependency>
    
      </dependencies>
      ...
    </project>
  5. Add the JMS and ActiveMQ dependencies. Look for the dependencies element in the POM file and add the following dependency elements:

    <project ...>
      ...
      <dependencies>
        ...
        <!-- Persistence artifacts -->
        <dependency> 
          <groupId>org.apache.camel</groupId> 
          <artifactId>camel-jms</artifactId> 
          <version>${camel-version}</version>
        </dependency> 
        <dependency> 
          <groupId>org.apache.activemq</groupId> 
          <artifactId>activemq-core</artifactId> 
          <version>${activemq-version}</version> 
        </dependency>
        <!--  This is needed by the camel-jms component -->
        <dependency> 
          <groupId>org.apache.xbean</groupId> 
          <artifactId>xbean-spring</artifactId> 
          <version>${xbean-version}</version> 
        </dependency>
    
      </dependencies>
      ...
    </project>

To configure the JMS transaction manager and the JMS resource, perform the following steps:

  1. Customize the Spring XML configuration. Using your favorite text editor, open the tx-jms-router/src/main/resources/META-INF/spring/camel-context.xml file and add the following content to the beans element:

    <beans ... >
      ...
      <bean id="jmstx" class="org.apache.camel.component.jms.JmsComponent"> 
        <property name="configuration" ref="jmsConfig" /> 
      </bean> 
    
      <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
          <property name="connectionFactory" ref="jmsConnectionFactory"/> 
          <property name="transactionManager" ref="jmsTransactionManager"/> 
          <property name="transacted" value="true"/> 
      </bean> 
    
      <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
      </bean>
      
      <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://broker1?brokerConfig=xbean:tutorial/activemq.xml"/>
      </bean>
    
    </beans>

    This configuration creates a custom JMS component, with bean ID equal to jmstx, that you can use to define transactional JMS endpoints in your routes. The underlying JMS system is an embedded Apache ActiveMQ broker, which takes its configuration from the file, tutorial/activemq.xml.

  2. Create the ActiveMQ configuration file, activemq.xml. First create the new directory, tx-jms-router/src/main/resources/tutorial. Next, using your favorite text editor, create the file, activemq.xml, under the tutorial directory, and add the following text:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans>
      <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
    
      <broker useJmx="true"  xmlns="http://activemq.org/config/1.0" persistent="false" brokerName="broker1">
        <transportConnectors>
          <transportConnector name="openwire" uri="tcp://localhost:61616"/>
        </transportConnectors>
      </broker>
      
    </beans>

To build and run the transactional JMS example, perform the following steps:

  1. To build the example, open a command prompt, change directory to tx-jms-router, and enter the following Maven command:

    mvn install

    If the build is successful, you should see the file, tx-jms-router-1.0-SNAPSHOT.jar, appear under the tx-jms-router/target directory.

  2. To run the example using the camel-maven-plugin, enter the following Maven command:

    mvn camel:run

    If all goes well, you should see about a dozen occurrences of java.lang.Exception: test scrolling past, before activity in the console window comes to a halt. Do not kill the running application at this point!

    [Note]Note

    But make sure that the exceptions you are seeing in the console do not indicate a failure to download and install the camel-maven-plugin. Normally, the plug-in should download and install without any problems, because the generated POM file, tx-jms-router/pom.xml, contains all of the requisite settings.

  3. What happened? The series of runtime exceptions thrown by the application is exactly what we expect to happen, because the route is programmed to throw an exception every time an exchange is processed by the route. The purpose of throwing the exception is to trigger a transaction rollback, causing the current exchange to be un-enqueued from the queue:credit and queue:debit queues.

    To gain a better insight into what occurred, open a JMX console and point it at the ActiveMQ broker. Open a new command prompt and enter the following command:

    jconsole
    [Note]Note

    The jconsole utility is a standard tool provided with Sun's J2SE distribution (JDK).

  4. To open a JMX connection to the ActiveMQ broker (which is embedded in the running example application), click on the Remote tab of the JConsole: Connect to Agent dialog and enter the Port number, 1099 (the default JMX port for ActiveMQ). Click Connect.

    [Note]Note

    It is possible to customize the JMX port used by ActiveMQ. See http://activemq.apache.org/jmx.html for details.

  5. If the connection succeeds, the JConsole window shows you a summary of the Virtual Machine (VM) instance that you are connected to. Click the MBeans tab and drill down to the giro queue, in Tree/org.apache.activemq/broker1/Queue.

    Notice that the EnqueueCount, DispatchCount, and DequeueCount for giro are all equal to 2, which indicates that two messages entered the queue and two messages were pulled off the queue.

  6. Click on the debits queue. Notice that the EnqueueCount, DispatchCount, and DequeueCount for debits are all equal to 0. This is because the test exception caused the enqueued message to be rolled back each time an exchange passed through the route. The same thing happened to the credits queue.

  7. Click on the ActiveMQ.DLQ queue. The DLQ part of this name stands for Dead Letter Queue and it is an integral part of the way ActiveMQ deals with failed message dispatches. In summary, the default behavior of ActiveMQ when it fails to dispatch a message (that is, when an exception reaches the JMS consumer endpoint, jmstx:queue:giro), is as follows:

    You can see from the status of the ActiveMQ.DLQ queue that the number of enqueued messages, EnqueueCount, is equal to 2. This is where the failed messages have ended up.

  8. You can now kill the example application by typing Ctrl-C in its command window.

A local transaction manager is a transaction manager that can coordinate transactions over a single resource only. In this case, the implementation of the transaction manager is typically embedded in the resource itself and the Spring transaction manager is just a thin wrapper around this built-in transaction manager.

For example, the Oracle database has a built-in transaction manager that supports demarcation operations (using SQL operations, BEGIN, COMMIT, ROLLBACK, or using a native Oracle API) and various levels of transaction isolation. Control over the Oracle transaction manager can be exported through JDBC, which is how Spring is able to wrap this transaction manager.

It is important to understand what constitutes a resource, in this context. For example, if you are using a JMS product, the JMS resource is the single running instance of the JMS product, not the individual queues and topics. Moreover, sometimes, what appears to be multiple resources might actually be a single resource, if the same underlying resource is accessed in different ways. For example, your application might access a relational database both directly (through JDBC) and indirectly (through an object-relational mapping tool like Hibernate). In this case, the same underlying transaction manager is involved, so it should be possible to enrol both of these code fragments in the same transaction.

[Note]Note

It cannot be guaranteed that this will work in every case. Although it is possible in principle, some detail in design of the Spring framework or other wrapper layers might prevent it from working in practice.

Of course, it is possible for an application to have many different local transaction managers working independently of each other. For example, you could have one route that manipulates JMS queues and topics, where the JMS endpoints reference a JMS transaction manager. Another route could access a relational database through JDBC. But you could not combine JDBC and JMS access in the same route and have them both participate in the same transaction.

A global transaction manager is a transaction manager that can coordinate transactions over multiple resources. In this case, you cannot rely on the transaction manager built into the resource itself. Instead, you require an external system, sometimes called a transaction processing monitor (TP monitor), that is capable of coordinating transactions across different resources.

The following are the prerequisites for global transactions:

[Tip]Tip

The Spring framework does not by itself provide a TP monitor to manage global transactions. It does, however, provide support for integrating with an OSGi-provided TP monitor or with a J2EE-provided TP monitor (where the integration is implemented by the JtaTransactionManager class). Hence, if you deploy your application into an OSGi container with full transaction support, you can use multiple transactional resources in Spring.

Figure 2.1 shows an overview of the Spring transaction architecture.


Figure 2.2 shows an overview of the OSGi transaction architecture in Red Hat JBoss Fuse. The core of the architecture is a JTA transaction manager based on Apache Geronimo, which exposes various transaction interfaces as OSGi services.


The PlatformTransactionManager interface is the key abstraction in the Spring transaction API, providing the classic transaction client operations: begin, commit and rollback. This interface thus provides the essential methods for controlling transactions at run time.

[Note]Note

The other key aspect of any transaction system is the API for implementing transactional resources. But transactional resources are generally implemented by the underlying database, so this aspect of transactional programming is rarely a concern for the application programmer.

Table 2.1 summarizes the local transaction manager implementations provided by the Spring framework. These transaction managers are distinguished by the fact that they support a single resource only.

Table 2.1. Local Transaction Managers

Transaction ManagerDescription
JmsTransactionManager

A transaction manager implementation that is capable of managing a single JMS resource. That is, you can connect to any number of queues or topics, but only if they belong to the same underlying JMS messaging product instance. Moreover, you cannot enlist any other types of resource in a transaction.

For example, using this transaction manager, it would not be possible to enlist both a SonicMQ resource and an Apache ActiveMQ resource in the same transaction. But see Table 2.2.

DataSourceTransactionManager A transaction manager implementation that is capable of managing a single JDBC database resource. That is, you can update any number of different database tables, but only if they belong to the same underlying database instance.
HibernateTransactionManager

A transaction manager implementation that is capable of managing a Hibernate resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction.

JdoTransactionManager A transaction manager implementation that is capable of managing a Java Data Objects (JDO) resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction.
JpaTransactionManager A transaction manager implementation that is capable of managing a Java Persistence API (JPA) resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction.
CciLocalTransactionManager

A transaction manager implementation that is capable of managing a Java Connection Architecture (JCA) resource. It is not possible, however, to simultaneously enlist any other kind of resource in a transaction.


In Example 2.2, the txManager bean is a local JDBC transaction manager instance, of DataSourceTransactionManager type. There is just one property you need to provide to the JDBC transaction manager: a reference to a JDBC data source.

In Example 2.2, the dataSource bean is an instance of a JDBC data source, of javax.sql.DataSource type. The JDBC data source is a standard feature of the Java DataBase Connectivity (JDBC) specification and it represents a single JDBC connection, which encapsulating the information required to connect to a specific database.

In Spring, the recommended way to create a data source is to instantiate a SimpleDriverDataSource bean (which implements the javax.sql.DataSource interface). The simple driver data source bean creates a new data source using a JDBC driver class (which is effectively a data source factory). The properties that you supply to the driver manager data source bean are specific to the database you want to connect to. In general, you need to supply the following properties:

driverClass

An instance of java.sql.Driver, which is the JDBC driver implemented by the database you want to connect to. Consult the third-party database documentation for the name of this driver class (some examples are given in Table 2.6).

url

The JDBC URL that is used to open a connection to the database. Consult the third-party database documentation for details of the URL format (some examples are given in Table 2.6).

For example, the URL provided to the dataSource bean in Example 2.2 is in a format prescribed by the HSQLDB database. The URL, jdbc:hsqldb:mem:camel, can be parsed as follows:

  • The prefix, jdbc:hsqldb:, is common to all HSQLDB JDBC connection URLs;

  • The prefix, mem:, signifies an in-memory (non-persistent) database;

  • The final identifier, camel, is an arbitrary name that identifies the in-memory database instance.

username

The username that is used to log on to the database.

For example, when a new HSQLDB database instance is created, the sa user is created by default (with administrator privileges).

password

The password that matches the specified username.

If your application is deployed into a J2EE container, it does not make sense to create a data source directly. Instead, you should let the J2EE container take care of creating data sources and you can then access those data sources by doing a JNDI lookup. For example, the following code fragment shows how you can obtain a data source from the JNDI reference, java:comp/env/jdbc/myds, and then wrap the data source with a UserCredentialsDataSourceAdapter.

 <bean id="myTargetDataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
   <property name="jndiName" value="java:comp/env/jdbc/myds"/>
 </bean>

 <bean id="myDataSource" class="org.springframework.jdbc.datasource.UserCredentialsDataSourceAdapter">
   <property name="targetDataSource" ref="myTargetDataSource"/>
   <property name="username" value="myusername"/>
   <property name="password" value="mypassword"/>
 </bean>

The JndiObjectFactoryBean exploits the Spring bean factory pattern to look up an object in JNDI. When this bean's ID, myTargetDataSource, is referenced elsewhere in Spring using the ref attribute, instead of getting a reference to the JndiObjectFactoryBean bean, you actually get a reference to the bean that was looked up in JNDI (a javax.sql.DataSource instance).

The standard javax.sql.DataSource interface exposes two methods for creating connections: getConnection() and getConnection(String username, String password). If (as is normally the case) the referenced database requires credentials in order to open a connection, the UserCredentialsDataSourceAdapter class provides a convenient way of ensuring that these user credentials are available. You can use this adapter class for wrapping JNDI-provided data sources that do not have their own credentials cache.

In addition to UserCredentialsDataSourceAdapter, there are a number of other adapter classes that you can use to wrap data sources obtained from JNDI lookups. These J2EE data source adapters are summarized in Table 2.4.


Table 2.6 shows the JDBC connection details for a variety of different database products.

Table 2.6. Connection Details for Various Databases

DatabaseJDBC Driver Manager Properties
HSQLDB

The JDBC driver class for HSQLDB is as follows:

org.hsqldb.jdbcDriver

To connect to a HSQLDB database, you can use one of the following JDBC URL formats:

jdbc:hsqldb:hsql[s]://host[:port][/DBName][KeyValuePairs]
jdbc:hsqldb:http[s]://host[:port][/DBName][KeyValuePairs]
jdbc:hsqldb:mem:DBName[KeyValuePairs]

Where the hsqls and https protocols use TLS security and the mem protocol references an in-process, transient database instance (useful for testing). For more details, see http://www.hsqldb.org/doc/src/org/hsqldb/jdbc/jdbcConnection.html.

MySQL

The JDBC driver class for MySQL is as follows:

com.mysql.jdbc.Driver

To connect to a MySQL database, use the following JDBC URL format:

jdbc:mysql://[host][,failoverhost...][:port]/[DBName][Options]

Where the Options coincidentally have the same format as Camel component options—for example, ?Option1=Value1&Option2=Value2. For more details, see http://dev.mysql.com/doc/refman/6.0/en/connector-j-reference-configuration-properties.html.

Oracle

Depending on which version of Oracle you are using choose one of the following JDBC driver classes:

oracle.jdbc.OracleDriver (Oracle 9i, 10)
oracle.jdbc.driver.OracleDriver (Oracle 8i)

To connect to an Oracle database, use the following JDBC URL format:

jdbc:oracle:thin:[user/password]@[host][:port]:SID

Where the Oracle System ID (SID) identifies an Oracle database instance. For more details, see http://download.oracle.com/docs/cd/B10501_01/java.920/a96654/basic.htm.

DB2

The JDBC driver class for DB2 is as follows:

com.ibm.db2.jcc.DB2Driver

To connect to a DB2 database, use the following JDBC URL format:

jdbc:db2://host[:port]/DBName
SQL Server

The JDBC driver class for SQL Server is as follows:

com.microsoft.jdbc.sqlserver.SQLServerDriver

To connect to a SQL Server database, use the following JDBC URL format:

jdbc:microsoft:sqlserver://host[:port];DatabaseName=DBName
Sybase

The JDBC driver class for Sybase is as follows:

com.sybase.jdbc3.jdbc.SybDriver

To connect to a Sybase database, use the following JDBC URL format:

jdbc:sybase:Tds:host:port/DBName
Informix

The JDBC driver class for Informix is as follows:

com.informix.jdbc.IfxDriver

To connect to an Informix database, use the following JDBC URL format:

jdbc:informix-sqli://host:port/DBName:informixserver=DBServerName
PostgreSQL

The JDBC driver class for PostgreSQL is as follows:

org.postgresql.Driver

To connect to a PostgreSQL database, use the following JDBC URL format:

jdbc:postgresql://host[:port]/DBName
MaxDB

The JDBC driver class for the SAP database is as follows:

com.sap.dbtech.jdbc.DriverSapDB

To connect to a MaxDB database, use the following JDBC URL format:

jdbc:sapdb://host[:port]/DBName
FrontBase

The JDBC driver class for FrontBase is as follows:

com.frontbase.jdbc.FBJDriver

To connect to a FrontBase database, use the following JDBC URL format:

jdbc:FrontBase://host[:port]/DBName

In Example 2.3, the hibernateTxManager bean is a local Hibernate transaction manager instance, of HibernateTransactionManager type. There is just one property you need to provide to the Hibernate transaction manager: a reference to a Hibernate session factory.

In Example 2.3, the mySessionFactory bean is a Hibernate session factory of org.springframework.orm.hibernate3.LocalSessionFactory type. This session factory bean is needed by the Hibernate transaction manager.

In general, you need to supply the following properties to a Hibernate LocalSessionFactory bean instance:

dataSource

An instance of javax.sql.DataSource, which is the JDBC data source of the database that Hibernate is layered over. For details of how to configure a JDBC data source, see JDBC Data Source.

mappingResources

Specifies a list of one or more mapping association files on the class path. A Hibernate mapping association defines how Java objects map to database tables.

hibernateProperties

Allows you to set any Hibernate property, by supplying a list of property settings. The most commonly needed property is hibernate.dialect, which indicates to Hibernate what sort of database it is layered over, enabling Hibernate to optimize its interaction with the underlying database. The dialect is specified as a class name, which can have one of the following values:

org.hibernate.dialect.Cache71Dialect
org.hibernate.dialect.DataDirectOracle9Dialect
org.hibernate.dialect.DB2390Dialect
org.hibernate.dialect.DB2400Dialect
org.hibernate.dialect.DB2Dialect
org.hibernate.dialect.DerbyDialect
org.hibernate.dialect.FirebirdDialect
org.hibernate.dialect.FrontBaseDialect
org.hibernate.dialect.H2Dialect
org.hibernate.dialect.HSQLDialect
org.hibernate.dialect.IngresDialect
org.hibernate.dialect.InterbaseDialect
org.hibernate.dialect.JDataStoreDialect
org.hibernate.dialect.MckoiDialect
org.hibernate.dialect.MimerSQLDialect
org.hibernate.dialect.MySQL5Dialect
org.hibernate.dialect.MySQL5InnoDBDialect
org.hibernate.dialect.MySQLDialect
org.hibernate.dialect.MySQLInnoDBDialect
org.hibernate.dialect.MySQLMyISAMDialect
org.hibernate.dialect.Oracle9Dialect
org.hibernate.dialect.OracleDialect
org.hibernate.dialect.PointbaseDialect
org.hibernate.dialect.PostgreSQLDialect
org.hibernate.dialect.ProgressDialect
org.hibernate.dialect.RDMSOS2200Dialect
org.hibernate.dialect.SAPDBDialect
org.hibernate.dialect.SQLServerDialect
org.hibernate.dialect.Sybase11Dialect
org.hibernate.dialect.SybaseAnywhereDialect
org.hibernate.dialect.SybaseDialect
org.hibernate.dialect.TimesTenDialect

In Example 2.4, the jpaTxManager bean is a local JPA transaction manager instance, of JpaTransactionManager type. The JPA transaction manager requires a reference to an entity manager factory bean (in this example, the entityManagerFactory bean).

If you deploy your application into an OSGi container, however, you might want to consider using a JtaTransactionManager instead. See Table 2.2.

Example 2.5 shows a sample persistence.xml file for configuring an OpenJPA JPA provider layered over a Derby database.

Example 2.5. Sample persistence.xml File

<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             version="1.0"
             xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">

  <persistence-unit name="camel" transaction-type="RESOURCE_LOCAL">
    <!--
        The default provider can be OpenJPA, or some other product.
        This element is optional if OpenJPA is the only JPA provider
        in the current classloading environment, but can be specified
        in cases where there are multiple JPA implementations available.
    -->
    <provider> 1
      org.apache.openjpa.persistence.PersistenceProviderImpl
    </provider>

    <class>org.apache.camel.examples.MultiSteps</class> 2
    <class>org.apache.camel.examples.SendEmail</class>

    <properties> 3
      <property name="openjpa.ConnectionURL" value="jdbc:derby:target/derby;create=true"/>
      <property name="openjpa.ConnectionDriverName" value="org.apache.derby.jdbc.EmbeddedDriver"/>
      <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
      <property name="openjpa.Log" value="DefaultLevel=WARN, Tool=INFO, SQL=TRACE"/>
    </properties>
  </persistence-unit>

</persistence>

1

The provider element can be used to specify the OpenJPA provider implementation class. If the provider element is omitted, the JPA layer simply uses the first JPA provider it can find. Hence, it is recommended to specify the provider element, if there are multiple JPA providers on your class path.

To make a JPA provider available to an application, simply add the provider's JAR file to the class path and the JPA layer will auto-detect the JPA provider.

2

Use the class elements to list all of the Java types that you want to persist using the JPA framework.

3

Use the properties element to configure the underlying JPA provider. In particular, you should at least provide enough information here to configure the connection to the underlying database.

The following code example shows how the org.apache.camel.examples.SendEmail class referenced in Example 2.5 should be annotated to turn it into a persistent entity bean (so that it is persistible by JPA):

// Java
package org.apache.camel.examples;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

/**
 * Represents a task which is added to the database, then removed from the database when it is consumed
 *
 * @version $Revision$
 */
@Entity
public class SendEmail {
    private Long id;
    private String address;

    public SendEmail() {
    }

    public SendEmail(String address) {
        setAddress(address);
    }

    @Override
    public String toString() {
        return "SendEmail[id: " + getId() + " address: " + getAddress() + "]";
    }

    @Id
    @GeneratedValue
    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}

The preceding class has the following JPA annotations:

@javax.persistence.Entity

Specifies that the following class is persistible by the JPA.

@javax.persistence.Id

The following bean property must be used as the primary key (for locating objects of this type in the database).

@javax.persistence.GeneratedValue

Specifies that the primary key values should be automatically generated by the JPA runtime (you can optionally set attributes on this annotation to configure the ID generation algorithm as well).

For the complete list of JPA annotations, see the API for the javax.persistence package.

The easiest way to configure a JMS endpoint to participate in transactions is to create a new an instance of a Camel JMS component that has the proper settings. To do so:

  1. Create a bean element that has its class attribute set to org.apache.camel.component.jms.JmsComponent.

    This bean creates an instance of the JMS component.

  2. Set the bean's id attribute to a unique, short, string.

    The id will be used to create route endpoint's that use this JMS component.

  3. Add an empty property child to the bean.

  4. Add a name attribute with the value of configuration to the property element.

  5. Add a ref attribute whose value is the id of a JmsConfiguration bean to the property element.

    The JmsConfiguration bean is used to configure the JMS component.

  6. Create a bean element that has its class attribute set to org.apache.camel.component.jms.JmsConfiguration.

    This bean creates an instance of the JMS component configuration.

  7. Set the bean's id attribute to the value supplied for the ref attribute in Step 5.

  8. Add a property child to the bean to configure the JMS connection factory.

    1. Set the name attribute to connectionFactory.

    2. Set the ref attribute to the id of a bean that configures a JMS connection factory.

  9. Add an empty property child to the bean that specifies the transaction manager the component will use.

    1. Set the name attribute to transactionManager.

    2. Set the ref attribute to the id of a bean that configures transaction manager the endpoint will use.

    See Selecting a Transaction Manager.

  10. Add an empty property child to the bean that configures the component to participate in transactions.

    1. Set the name attribute to transacted.

    2. Set the value attribute to true.

      The transacted property determines if the endpoint can participate in transactions.

  11. Optionally add an empty property child to the bean to change the default cache level.

    1. Set the name attribute to cacheLevelName.

    2. Set the value attribute to to a valid cache level.

    See Cache levels and performance.

The JmsComponent bean's id specifies the URI prefix used by JMS endpoints that will use the transactional JMS component. For example, in Example 3.1 the JmsComponent bean's id equals jmstx, so endpoint that use the configured JMS component use the jmstx: prefix.

The JmsConfiguration class supports a large number of other properties, which are essentially identical to the JMS URI options described in JMS in EIP Component Reference.

Example 3.1 shows the configuration of a JMS component, jmstx that supports Spring transactions. The JMS component is layered over an embedded instance of Apache ActiveMQ and the transaction manager is an instance of JmsTransactionManager.


To use this JMS component in a route you would use the URI prefix jmstx: as shown in Example 3.2.


Figure 3.1 shows an overview of a scenario consisting of JMS consumer endpoint feeding into a route that ends with a JMS producer endpoint. This route is designed to process exclusively InOnly exchanges.


Messages coming into the route shown in Figure 3.1 are processed as follows:

  1. When a oneway message (JMSReplyTo header is absent) is polled by the JMS consumer endpoint, the endpoint starts a transaction, provisionally takes the message off the incoming queue, and creates an InOnly exchange object to hold the message.

  2. After propagating through the route, the InOnly exchange arrives at the JMS producer endpoint, which provisionally writes the exchange to the outgoing queue.

  3. At this point, we have arrived at the end of the transaction scope. If there were no errors (and the transaction is not marked for rollback), the transaction is automatically committed. Upon committing, both of the JMS endpoints send acknowledgement messages to the queues, turning the provisional read and the provisional write into a committed read and a committed write.

The InOut MEP is fundamentally incompatible with a route containing transactional JMS endpoints. In almost all cases, the route will hang and no reply will ever be sent. To understand why, consider the following route for processing payment requests:

from("jmstx:queue:rawPayments")
    .process(inputReformatter)
    .to("jmstx:queue:formattedPayments")
    .process(outputReformatter);

The JMS consumer endpoint, jmstx:queue:rawPayments, polls for messages, which are expected to have a JMSReplyTo header (for InOut mode). For each incoming message, a new transaction is started and an InOut exchange is created. After reformatting by the inputReformatter processor, the InOut exchange proceeds to the JMS producer endpoint, jmstx:queue:formattedPayments, which sends the message and expects to receive a reply on a temporary queue. This scenario is illustrated by Figure 3.2


The scope of the transaction includes the entire route, the request leg as well as the reply leg. The processing of the route proceeds as expected until the exchange arrives at the JMS producer endpoint, at which point the producer endpoint makes a provisional write to the outgoing request queue. At this point the route hangs: the JMS producer endpoint is waiting to receive a message from the reply queue, but the reply can never be received because the outgoing request message was only provisionally written to the request queue (and is thus invisible to the service at the other end of the queue).

It turns out that this problem is not trivial to solve. When you consider all of the ways that this scenario could fail and how to guarantee transactional integrity in all cases, it would require some substantial changes to the way that Apache Camel works. Fortunately, there is a simpler way of dealing with request/reply semantics that is already supported by Apache Camel.

The org.springframework.jms.core.JmsTemplate class is a general-purpose class for managing Java Messaging Service (JMS) connections. One of the main advantages of this class is that it simplifies the JMS synchronous access codes.

To create an instance of a JmsTemplate, you need to supply a reference to a javax.jms.ConnectionFactory object.

The org.springframework.jdbc.core.JdbcTemplate class is a wrapper around a JDBC data source, enabling you to access a JDBC database using SQL operations.

To create an instance of a JdbcTemplate, you need to supply a reference to a javax.sql.DataSource object (for example, see JDBC Data Source).

[Note]Note

For a detailed discussion of the JdbcTemplate class, see Spring JDBC Template.

The org.springframework.jdbc.core.simple.SimpleJdbcTemplate class is a convenience wrapper around the JdbcTemplate class. This class has been pared down so that it includes only the most commonly used template methods and it has been optimized to exploit Java 5 features.

The org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate class is a convenience wrapper around the JdbcTemplate class, which enables you to use named parameters instead of the usual ? placeholders embedded in a SQL statement.

The org.springframework.orm.ibatis.SqlMapClientTemplate class is a simplifying wrapper around the iBATIS SqlMapClient class. iBATIS is an Object Relational Mapper (ORM) that is capable of automatically instantiating Java objects based on a given SQL database schema.

The org.springframework.orm.hibernate3.HibernateTemplate class provides an alternative to working with the raw Hibernate 3 session API (based on sessions returned from SessionFactory.getCurrentSession()).

[Note]Note

For Hibernate versions 3.0.1 or later, the Spring documentation recommends that you use the native Hibernate 3 API instead of the HibernateTemplate class, because transactional Hibernate access code can now be coded using the native Hibernate API.

The org.springframework.orm.jdo.JdoTemplate class provides an alternative to working with the raw JDO PersistenceManager API. The main difference between the APIs relates to their exception handling. See the Spring JavaDoc for details.

The org.springframework.orm.jpa.JpaTemplate class provides an alternative to working with the raw JPA EntityManager API..

[Note]Note

The Spring documentation now recommends that you use the native JPA programming interface instead of the JpaTemplate class. Considering that the JPA programming interface is itself a thin wrapper layer, there is little advantage to be had by adding another wrapper layer on top of it.

The org.springframework.jdbc.core.JdbcTemplate class is the key class for accessing databases through JDBC in Spring. It provides a complete API for executing SQL statements on the database at run time. The following kinds of SQL operations are supported by JdbcTemplate:

The JdbcTemplate query methods are used to send SELECT queries to the database. A variety of different query methods are supported, depending on how complicated the return values are.

The simplest case is where you expect the query to return a single value from a single row. In this case, you can use a type-specific query method to retrieve the single value. For example, if you want to retrieve the balance of a particular customer's account from the accounts table, you could use the following code:

// Java
int origAmount = jdbc.queryForInt(
    "select amount from accounts where name = ?",
    new Object[]{name}
);

The arguments to the SQL query are provided as a static array of objects, Object[]{name}. In this example, the name string is bound to the question mark, ?, in the SQL query string. If there are multiple arguments to the query string (where each argument in the SQL string is represented by a question mark, ?), you would provide an object array with multiple arguments—for example, Object[]{arg1,arg2,arg3,...}.

The next most complicated case is where you expect the query to return multiple values from a single row. In this case, you can use one of the queryForMap() methods to retrieve the contents of a single row. For example, to retrieve the complete account details from a single customer:

// Java
Map<String,Object> rowMap = jdbc.queryForMap(
    "select * from accounts where name = ?",
    new Object[]{name}
);

Where the returned map object, rowMap, contains one entry for each column, using the column name as the key.

The most general case is where you expect the query to return multiple values from multiple rows. In this case, you can use one of the queryForList() methods to return the contents of multiple rows. For example, to return all of the rows from the accounts table:

// Java
List<Map<String,Object> > rows = jdbc.queryForList(
    "select * from accounts"
);

In some cases, a more convenient way of returning the table rows is to provide a RowMapper, which automatically converts each row to a Java object. The return value of a query call would then be a list of Java objects. For example, the contents of the accounts table could be returned as follows:

// Java
List<Account> accountList = jdbc.query(
    "select * from accounts",
    new Object[]{},
    new RowMapper() {
        public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
            Account acc = new Account();
            acc.setName(rs.getString("name"));
            acc.setAmount(rs.getLong("amount"));
            return acc;
        }
    }
);

Where each Account object in the returned list encapsulates the contents of a single row.

The JdbcTemplate update methods are used to perform INSERT, UPDATE, or DELETE operations on the database. The update methods modify the database contents, but do not return any data from the database (apart from an integer return value, which counts the number of rows affected by the operation).

For example, the following update operation shows how to set the amount field in a customer's account:

// Java
jdbc.update(
    "update accounts set amount = ? where name = ?",
    new Object[] {newAmount, name}
);

Before we can start performing any queries on the database, the first thing we need to do is to create an accounts table and populate it with some initial values. Example 4.1 shows the definition of the CreateTable class, which is responsible for intializing the accounts table.


Where the accounts table consists of two columns: name, a string value that records the account holder's name, and amount, a long integer that records the amount of money in the account. Because this example uses an ephemeral database, which exists only temporarily in memory, it is necessary to re-initialize the database every time the example runs. A convenient way to initialize the table is by instantiating a CreateTable bean in the Spring XML configuration, as follows:

<beans ...>
    <!-- datasource to the database -->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
        <property name="driverClass" value="org.hsqldb.jdbcDriver"/>
        <property name="url" value="jdbc:hsqldb:mem:camel"/>
        <property name="username" value="sa"/>
        <property name="password" value=""/>
    </bean>

    <!--  Bean to initialize table in the DB -->
    <bean id="createTable" class="com.fusesource.demo.tx.jdbc.java.CreateTable">
        <constructor-arg ref="dataSource" />
    </bean>
    ...
</beans>

As soon as the createTable bean is instantiated, the accounts table is ready for use. Note that a reference to the JDBC data source, dataSource, is passed to the CreateTable() constructor, because the data source is needed to create a JdbcTemplate instance.

The credit() method adds the specified amount of money, amount, to the specified account, name in the accounts database table, as follows:

    public void credit(
            @XPath("/transaction/transfer/receiver/text()") String name, 1
            @XPath("/transaction/transfer/amount/text()") String amount
            )
    {
        log.info("credit() called with args name = " + name + " and amount = " + amount);
        int origAmount = jdbc.queryForInt( 2
                "select amount from accounts where name = ?",
                new Object[]{name}
        );
        int newAmount = origAmount + Integer.parseInt(amount);
        
        jdbc.update( 3
                "update accounts set amount = ? where name = ?",
                new Object[] {newAmount, name}
        );
    }

1

For methods invoked using the beanRef() (or bean()) DSL command, Apache Camel provides a powerful set of annotations for binding the exchange to the method parameters. In this example, the parameters are annotated using the @XPath annotation, so that the result of the XPath expression is injected into the corresponding parameter.

For example, the first XPath expression, /transaction/transfer/receiver/text(), selects the contents of the receiver XML element from the body of the exchange's In message and injects them into the name parameter. Likewise, the contents of the amount element are injected into the amount parameter.

2

The JdbcTemplate.queryForInt() method returns the current balance of the name account. For details about using JdbcTemplate to make database queries, see Querying.

3

The JdbcTemplate.update() method updates the balance of the name account, adding the specified amount of money. For details about using JdbcTemplate to make database updates, see Updating.

The debit() method subtracts the specified amount of money, amount, from the specified account, name in the accounts database table, as follows:

    public void debit(
            @XPath("/transaction/transfer/sender/text()") String name, 1
            @XPath("/transaction/transfer/amount/text()") String amount
            )
    {
        log.info("debit() called with args name = " + name + " and amount = " + amount);
        int iamount = Integer.parseInt(amount);
        if (iamount > 100) { 2
            throw new IllegalArgumentException("Debit limit is 100");
        }
        int origAmount = jdbc.queryForInt(
                "select amount from accounts where name = ?",
                new Object[]{name}
        );
        int newAmount = origAmount - Integer.parseInt(amount);
        if (newAmount < 0) { 3
            throw new IllegalArgumentException("Not enough in account");
        }
        
        jdbc.update(
                "update accounts set amount = ? where name = ?",
                new Object[] {newAmount, name}
        );
    }

1

The parameters of the debit() method are also bound to the exchange using annotations. In this case, however, the name of the account is bound to the sender XML element in the In message.

2

There is a fixed debit limit of 100. Amounts greater than this will trigger an IllegalArgument exception. This feature is useful, if you want to trigger a rollback to test a transaction example.

3

If the balance of the account would go below zero after debiting, abort the transaction by calling the IllegalArgumentException exception.

To demarcate transactions, the transacted processor must be associated with a particular transaction manager instance. To save you having to specify the transaction manager every time you invoke transacted(), the transacted processor automatically picks a sensible default. For example, if there is only one instance of a transaction manager in your Spring configuration, the transacted processor implicitly picks this transaction manager and uses it to demarcate transactions.

A transacted processor can also be configured with a transacted policy, of TransactedPolicy type, which encapsulates a propagation policy and a transaction manager (see Propagation Policies for details). The following rules are used to pick the default transaction manager or transaction policy:

  1. If there is only one bean of org.apache.camel.spi.TransactedPolicy type, use this bean.

    [Note]Note

    The TransactedPolicy type is a base type of the SpringTransactionPolicy type that is described in Propagation Policies. Hence, the bean referred to here could be a SpringTransactionPolicy bean.

  2. If there is a bean of type, org.apache.camel.spi.TransactedPolicy, which has the ID, PROPAGATION_REQUIRED, use this bean.

  3. If there is only one bean of org.springframework.transaction.PlatformTransactionManager type, use this bean.

You also have the option of specifying a bean explicitly by providing the bean ID as an argument to transacted()—see Sample route with PROPAGATION_NEVER policy in Java DSL.

If you insert a transacted processor into a route, a new transaction is created each time an exchange passes through this node and the transaction's scope is defined as follows:

In particular, all of the route nodes preceding the transacted processor are not included in the transaction (but the situation is different, if the route begins with a transactional endpoint—see Demarcation by Transactional Endpoints). For example, the following route is incorrect, because the transacted() DSL command mistakenly appears after the first beanRef() call (which accesses the database resource):

// Java
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
          .beanRef("accountService","credit")
          .transacted()  // <-- WARNING: Transaction started in the wrong place!
          .beanRef("accountService","debit");
    }
}

If you want to break a route into fragments and have each route fragment participate in the current transaction, you can use direct: endpoints. For example, to send exchanges to separate route fragments, depending on whether the transfer amount is big (greater than 100) or small (less than or equal to 100), you can use the choice() DSL command and direct endpoints, as follows:

// Java
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
            .transacted()
            .beanRef("accountService","credit")
            .choice().when(xpath("/transaction/transfer[amount > 100]"))
                .to("direct:txbig")
            .otherwise()
                .to("direct:txsmall");

        from("direct:txbig")
            .beanRef("accountService","debit")
            .beanRef("accountService","dumpTable")
            .to("file:target/messages/big");

        from("direct:txsmall")
            .beanRef("accountService","debit")
            .beanRef("accountService","dumpTable")
            .to("file:target/messages/small");
    }
}

Both the fragment beginning with direct:txbig and the fragment beginning with direct:txsmall participate in the current transaction, because the direct endpoints are synchronous. This means that the fragments execute in the same thread as the first route fragment and, therefore, they are included in the same transaction scope.

[Note]Note

You must not use seda endpoints to join the route fragments, because seda consumer endpoints create a new thread (or threads) to execute the route fragment (asynchronous processing). Hence, the fragments would not participate in the original transaction.

The transacted() DSL command is not required in a route that starts with a transactional endpoint. Nevertheless, assuming that the default transaction policy is PROPAGATION_REQUIRED (see Propagation Policies), it is usually harmless to include the transacted() command, as in this example:

from("jmstx:queue:giro")
    .transacted()
    .to("jmstx:queue:credits")
    .to("jmstx:queue:debits");

However, it is possible for this route to behave in unexpected ways—for example, if a single TransactedPolicy bean having a non-default propagation policy is created in Spring XML (see Default transaction manager and transacted policy)—so it is generally better not to include the transacted() DSL command in routes that start with a transactional endpoint.

Example 5.1 shows how to define transaction policy beans for all of the supported propagation behaviors. For convenience, each of the bean IDs matches the specified value of the propagation behavior value, but in practice you can use whatever value you like for the bean IDs.

Example 5.1. Transaction Policy Beans

<beans xmlns="http://www.springframework.org/schema/beans" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
  ...
  <bean id="PROPAGATION_MANDATORY "class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="txManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_MANDATORY"/>
  </bean>

  <bean id="PROPAGATION_NESTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="txManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_NESTED"/>
  </bean>

  <bean id="PROPAGATION_NEVER" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="txManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_NEVER"/>
  </bean>

  <bean id="PROPAGATION_NOT_SUPPORTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="txManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/>
  </bean>

  <!-- This is the default behavior. -->
  <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="txManager"/>
  </bean>
  
  <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="txManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
  </bean>

  <bean id="PROPAGATION_SUPPORTS" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="txManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_SUPPORTS"/>
  </bean>

</beans>

[Note]Note

If you want to paste any of these bean definitions into your own Spring XML configuration, remember to customize the references to the transaction manager. That is, replace references to txManager with the actual ID of your transaction manager bean.

A simple way of demonstrating that transaction policies have some effect on a transaction is to insert a PROPAGATION_NEVER policy into the middle of an existing transaction, as shown in the following route:

from("file:src/data?noop=true")
    .transacted()
    .beanRef("accountService","credit")
    .transacted("PROPAGATION_NEVER")
    .beanRef("accountService","debit");

Used in this way, the PROPAGATION_NEVER policy inevitably aborts every transaction, leading to a transaction rollback. You should easily be able to see the effect of this on your application.

[Note]Note

Remember that the string value passed to transacted() is a bean ID, not a propagation behavior name. In this example, the bean ID is chosen to be the same as a propagation behavior name, but this need not always be the case. For example, if your application uses more than one transaction manager, you might end up with more than one policy bean having a particular propagation behavior. In this case, you could not simply name the beans after the propagation behavior.

The markRollbackOnly() DSL command enables you to force the current transaction to roll back, without throwing an exception. This can be useful in cases where (as in Example 5.2) throwing an exception has unwanted side effects.

For example, Example 5.3 shows how to modify Example 5.2 by replacing rollback() with markRollbackOnly(). This version of the route solves the problem of the infinite loop. In this case, when the amount of the money transfer exceeds 100, the current transaction is rolled back, but no exception is thrown. Because the file endpoint does not receive an exception, it does not retry the exchange, and the failed transactions is quietly discarded.


The preceding route implementation is not ideal, however. Although the route cleanly rolls back the transaction (leaving the database in a consistent state) and avoids the pitfall of infinite looping, it does not keep any record of the failed transaction. In a real-world application, you would typically want to keep track of any failed transaction. For example, you might want to write a letter to the relevant customer in order to explain why the transaction did not succeed. A convenient way of tracking failed transactions is to add a dead-letter queue to the route.

In order to keep track of failed transactions, you can define an onException() clause, which enables you to divert the relevant exchange object to a dead-letter queue. When used in the context of transactions, however, you need to be careful about how you define the onException() clause, because of potential interactions between exception handling and transaction handling. Example 5.4 shows the correct way to define an onException() clause, assuming that you need to suppress the rethrown exception.


In the preceding example, onException() is configured to catch the IllegalArgumentException exception and send the offending exchange to a dead letter file, deadLetters.xml (of course, you can change this definition to catch whatever kind of exception arises in your application). The exception rethrow behavior and the transaction rollback behavior are controlled by the following special settings in the onException() clause:

Instead of using onException(), a simple approach to handling exceptions in a transactional route is to use the doTry() and doCatch() clauses around the route. For example, Example 5.5 shows how you can catch and handle the IllegalArgumentException in a transactional route, without the risk of getting trapped in an infinite loop.


In this example, the route is split into two segments. The first segment (from the file:src/data endpoint) receives the incoming exchanges and performs the exception handling using doTry() and doCatch(). The second segment (from the direct:split endpoint) does all of the transactional work. If an exception occurs within this transactional segment, it propagates first of all to the transacted() command, causing the current transaction to be rolled back, and it is then caught by the doCatch() clause in the first route segment. The doCatch() clause does not rethrow the exception, so the file endpoint does not do any retries and infinite looping is avoided.

Figure 6.1 shows an overview of the OSGi transaction architecture in Fuse ESB Enterprise. The core of the architecture is a JTA transaction manager based on Apache Geronimo, which exposes various transaction interfaces as OSGi services.


The underlying implementation of the Fuse ESB Enterprise transaction service is provided by the Apache Geronimo transaction manager. Apache Geronimo is a full implementation of a J2EE server and, as part of the J2EE implementation, Geronimo has developed a sophisticated transaction manager with the following features:

  • Support for enlisting multiple XA resources.

  • Support for 1-phase and 2-phase commit protocols.

  • Support for suspending and resuming transactions.

  • Support for automatic transaction recovery upon startup.

The transaction recovery feature depends on a transaction log, which records the status of all pending transactions in persistent storage.

The implementation of the transaction log is provided by HOWL, which is a high speed persistent logger that is optimized for XA transaction logs.

The way to perform auto-enlisting of a JMS XA resource is to implement a wrapper class of type, javax.jms.ConnectionFactory. You can then implement this class, so that every time a new connection is created, the JMS XA resource is enlisted with the current transaction.

Apache ActiveMQ provides the wrapper class, XaPooledConnectionFactory, which implements auto-enlistment for you and, additionally, implements JMS connection pooling for better performance. To use the XaPooledConnectionFactory wrapper class, simply create an instance that takes a reference to an XA connection factory instance (for example, ActiveMQXAConnectionFactory) and also provide a reference to the transaction manager (which is used to enlist the resource).

For example, the following example shows how you can use Spring XML to define an auto-enlisting JMS connection factory (with the bean ID, jmsXaPoolConnectionFactory):

Figure 7.1 shows an overview of how to configure the Camel JMS component to use XA transactions in the Fuse ESB Enterprise OSGi container. In this scenario, the Camel JMS component is integrated with the built-in Aries transaction manager and a connection factory wrapper is included, to support auto-enlisting of XA resources.


The XA transaction manager, which is embedded in the OSGi container, must be accessed through two different interfaces:

org.springframework.transaction.PlatformTransactionManager

The PlatformTransactionManager interface is needed by the Camel JMS component (which is layered over the Spring transaction API). For more details, see PlatformTransactionManager Interface.

javax.transaction.TransactionManager

The TransactionManager interface is needed by the JCA pooled connection factory, which uses it to enlist the ActiveMQ XA resource.

The transaction manager interfaces are accessed as OSGi services. For example, to access the interfaces through Spring XML, you can use the following code:

<beans ...>

    <!--
        OSGi TM Service
    -->
    <!-- access through Spring's PlatformTransactionManager -->
    <osgi:reference id="osgiPlatformTransactionManager"
                    interface="org.springframework.transaction.PlatformTransactionManager"/>
    <!-- access through JTA TransactionManager -->
    <osgi:reference id="osgiJtaTransactionManager"
                    interface="javax.transaction.TransactionManager"/>

</beans>

For more details, see Accessing the Transaction Manager.

The JMS configuration bean encapsulates all of the required settings for the Camel JMS component. In particular, the JMS configuration bean includes a reference to the transaction manager (of PlatformTransactionManager type) and a reference to the JCA pooled connection factory (of JcaPooledConnectionFactory type).

The org.apache.camel.component.jms.JmsConfiguration class supports the following bean properties, which are particularly relevant to transactions:

transacted

Must be set to false for XA transactions. The name of this property is misleading. What it really indicates is whether or not the Camel JMS component supports local transactions. For XA transactions, on the other hand, you must set this property to false and initialize the transactionManager property with a reference to an XA transaction manager.

This property gets its name from the sessionTransacted property in the underlying Spring transaction layer. The transacted property ultimately gets injected into the sessionTransacted property in the Spring transaction layer, so it is the Spring transaction layer that determines the semantics. For more details, see the Javadoc for Spring's AbstractMessageListenerContainer.

transactionManager

Must be initialized with a reference to the PlatformTransactionManager interface of the built-in OSGi transaction manager.

transactionName

Sets the transaction name. Default is JmsConsumer[destinationName].

cacheLevelName

Try setting this initially to CACHE_CONNECTION, because this will give you the best performance. If this setting turns out to be incompatible with your transaction system, you can revert to CACHE_NONE, whcih switches off all caching in the Spring transaction layer. For more details, see the Spring documentation.

transactionTimeout

Do not set this property in the context of XA transactions. To customize the transaction timeout in the context of XA transactions, you need to configure the timeout directly in the OSGi transaction manager instead (see Configuring the Transaction Manager for details).

lazyCreateTransactionManager

Do not set this boolean property in the context of XA transactions. (In the context of a local transaction, setting this property to true would direct the Spring transaction layer to create its own local transaction manager instance, whenever it is needed.)

The Apache ActiveMQ implementation of the JMS XA resource supports the two-phase commit protocol and also implicitly supports distributed transactions. To understand how this works, we take a closer look at the interaction between a JMS XA resource and the broker to which it is connected.

Figure 7.2 illustrates what happens when a JMS XA resource participates in a transaction that is committed using the two-phase commit protocol. The JMS XA resource is deployed in an application that runs in the Client host and the corresponding Apache ActiveMQ broker is deployed on the Remote host, so that this transaction branch is effectively a distributed transaction.


The two-phase commit shown in Figure 7.2 consists of the following steps:

  1. Immediately after the transaction begins, the transaction manager invokes start() on the JMS XA resource, which indicates that the resource should initialize a new transaction. The JMS XA resource now generates a new transaction ID and sends it over the network to the remote broker.

  2. The JMS XA resource now forwards all of the operations that arise during a JMS session (for example, messages, acknowledgments, and so on) to the remote broker.

    On the broker side, the received operations are not performed immediately. Because the operations are happening in a transaction context and the transaction is not yet committed, the broker buffers all of the operations in a transaction store (held in memory, initially). Messages held in the transaction store are not forwarded to JMS consumers.

  3. In a two-phase commit process, the first phase of completing the transaction is where the transaction manager invokes prepare() on all of the participating XA resources. At this stage, the JMS XA resource sends the prepare() operation to the remote broker.

    On the broker side, when the transaction store receives the prepare() operation, it writes all of the buffered operations to disk. Hence, after the prepare phase, there is no longer any risk of losing data associated with this transaction branch.

  4. The second phase of completing the transaction is where the transaction manager invokes commit() on all of the participating XA resources. The JMS XA resource sends the commit() operation to the remote broker.

    On the broker side, the transaction store marks this transaction as complete. The pending operations are now executed and any pending messages can now be forwarded to JMS consumers.

Example 7.2 shows the complete Spring XML configuration required to initialize the jmstx Camel JMS component with XA transactions. After setting up the Camel JMS component with this code, you can create a transactional JMS endpoint in a route using a URL like jmstx:queue:QueueName.

Example 7.2. Camel JMS Component with XA Enabled

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xsi:schemaLocation="
   http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/osgi  
       http://www.springframework.org/schema/osgi/spring-osgi.xsd    
">

    <!--
        OSGi TM Service
    -->
    <!-- access through Spring's PlatformTransactionManager -->
    <osgi:reference id="osgiPlatformTransactionManager"
        interface="org.springframework.transaction.PlatformTransactionManager"/> 1
    <!-- access through PlatformTransactionManager -->
    <osgi:reference id="osgiJtaTransactionManager"
        interface="javax.transaction.TransactionManager"/> 2
    ...
    <!--
        JMS TX endpoint configuration
    -->
    <bean id="jmstx"
          class="org.apache.activemq.camel.component.ActiveMQComponent"> 3
        <property name="configuration" ref="jmsTxConfig" /> 
    </bean> 
    
    <bean id="jmsTxConfig"
          class="org.apache.camel.component.jms.JmsConfiguration"> 4
        <property name="connectionFactory" ref="jmsXaPoolConnectionFactory"/> 
        <property name="transactionManager" ref="osgiPlatformTransactionManager"/> 
        <property name="transacted" value="false"/>
        <property name="cacheLevelName" value="CACHE_CONNECTION"/>
    </bean> 
    
    <!-- connection factory wrapper to support auto-enlisting of XA resource -->
    <bean id="jmsXaPoolConnectionFactory"
          class="org.apache.activemq.pool.JcaPooledConnectionFactory"> 5
        <property name="maxConnections" value="1" /> 6
        <property name="connectionFactory" ref="jmsXaConnectionFactory" />
        <property name="transactionManager" ref="osgiJtaTransactionManager" />
        <property name="name" value="activemq.default" /> 7
    </bean>
    
    <bean id="jmsXaConnectionFactory"
          class="org.apache.activemq.ActiveMQXAConnectionFactory"> 8
        <property name="brokerURL" value="vm:local?jms.prefetchPolicy.all=1"/> 9
        <property name="redeliveryPolicy"> 10
            <bean class="org.apache.activemq.RedeliveryPolicy">
                <property name="maximumRedeliveries" value="0"/>
            </bean>
        </property>
    </bean>
    
    <!--
        ActiveMQ XA Resource Manager
    -->
    <bean id="resourceManager"
          class="org.apache.activemq.pool.ActiveMQResourceManager"
          init-method="recoverResource"> 
        <property name="transactionManager" ref="osgiJtaTransactionManager" />
        <property name="connectionFactory" ref="jmsXaPoolConnectionFactory" />
        <property name="resourceName" value="activemq.default" /> 11
    </bean>    
    ...
</beans>

The preceding Spring XML configuration can be explained as follows:

1

Define a reference to the OSGi service that exposes the PlatformTransactionManager interface of the OSGi container's built-in XA transaction manager. This service can then be accessed through the bean ID, osgiPlatformTransactionManager.

2

Define a reference to the OSGi service that exposes the JTA TransactionManager interface of the OSGi container's built-in XA transaction manager. This service can then be accessed through the bean ID, osgiJtaTransactionManager.

3

The bean identified by the ID, jmstx, is the ActiveMQ implementation of the Camel JMS component. You can use this component to define transactional JMS endpoints in your routes. The only property that you need to set on this bean is a reference to the JmsConfiguration bean with the ID, jmsTxConfig.

4

The JmsConfiguration bean with the ID, jmsTxConfig, is configured as described in Enabling XA on the Camel JMS Component. In particular, the configuration bean gets a reference to the JCA pooled connection factory and a reference to the osgiPlatformTransactionManager bean. The transacted property must be set to false.

5

The JcaPooledConnectionFactory is a wrapper class that adds extra capabilities to the basic connection factory bean (that is, it adds the capabilities to auto-enlist XA resources, to pool JMS connections, and support for automatic recovery in combination with the resource manager).

[Note]Note

Only the JcaPooledConnectionFactory class has support for auto-recovery of transactions (when combined with the resource manager). The XaPooledConnectionFactory class does not have this support.

6

The maxConnections property should be set to 1.

7

In order for the Aries transaction manager to write its transaction log properly (which is needed if you want to recover transactions after a crash), the name property of the JcaPooledConnectionFactory bean must be set to a value that matches the resourceName property of the resourceManager resource manager bean.

8

The bean with the ID, jmsXaConnectionFactory, is the basic connection factory, which encapsulates the code for connecting to the JMS broker. In this case, the bean is an instance of ActiveMQXAConnectionFactory type, which is a special connection factory class that you must use when you want to connect to the ActiveMQ broker with support for XA transactions.

9

The brokerURL property defines the protocol for connecting to the broker. In this case, the vm:local URL connects to the broker that is embedded in the current JVM and is identified by the name local (the configuration of the embedded broker is not shown in this example).

There are many different protocols supported by Apache ActiveMQ that you could use here. For example, to connect to a remote broker through the OpenWire TCP protocol listening on IP port 61616 on host MyHost, you would use the broker URL, tcp://MyHost:61616.

In this example, the prefetch policy is set to 1 for all queues and topics. This ensures that a connection pulls only one message at a time from the broker, which is the recommended setting when the CACHE_CONNECTION option is selected (see the settings on the JmsTxConfig bean). When CACHE_CONNECTION is selected, a new JMS consumer and session are recreated for every message.

10

In this example, the redelivery policy is disabled by setting maximumRedeliveries to 0. Typically, you would not use a redelivery policy together with transactions. An alternative approach would be to define an exception handler that routes failed exchanges to a dead letter queue.

11

The resourceName property is the key entry that maps from the transaction manager log to a real-world XAResource object. It must be unique for each XAResource object.

Apache Derby is an open source database implementation, which provides a full implementation of XA transactions. In the current document, we use it as the basis for some of our examples and tutorials.

[Important]Important

Apache Derby is neither maintained nor supported by FuseSource. No guarantees are given with respect to the robustness or correctness of its XA implementation. It is used here solely for the purposes of illustration.

Apache Derby provides the following alternative data source implementations (from the org.apache.derby.jdbc package):

[Note]Note

If you need to gain access to the additional API methods defined in the JDBC 4.0 specification (such as isWrapperFor), use the variants of these data source classes with 40 appended. For example, EmbeddedDataSource40, EmbeddedXADataSource40, and so on.

In Spring XML, you can expose a Derby XA data source as an OSGi service using the code shown in Example 8.1.


In this example, the Derby database runs as an embedded instance in the current bundle, where the name of the database instance is txXaTutorial. The exported OSGi service defines two service properties: osgi.jndi.service.name and datasource.name. These properties can be used to disambiguate the data sources, in case multiple data sources are exposed as OSGi services.

[Note]Note

Additionally, you must set the requisite Java system properties for Derby in the OSGi container (not shown here). For details, see Integrate Derby with Fuse ESB Enterprise.

The derby-ds bundle shown in Figure 8.1 encapsulates the code from Example 8.2, which defines a Derby XA data source and exports it as an OSGi service.

Also shown wthin the scope of the derby-ds bundle is the auto-enlisting data source proxy. But this data source proxy is not created by the code in the derby-ds bundle and is initially not part of the bundle.

If you deploy the derby-ds bundle, you can see how the wrapper proxy is automatically created. For example, after following the instructions in Define a Derby Datasource and Deploy and Run the Transactional Route to build and deploy the derby-ds bundle, you can list the OSGi services exported by the derby-ds bundle using the osgi:ls console command. Assuming that derby-ds has the bundle ID, 229, you would then enter:

karaf@root> osgi:ls 229

The console produces output similar to the following:

Derby XA data source (229) provides:
------------------------------------
datasource.name = derbyXADB
objectClass = javax.sql.XADataSource
osgi.jndi.service.name = jdbc/derbyXADB
osgi.service.blueprint.compname = derbyXADataSource
service.id = 423
----
aries.xa.aware = true
datasource.name = derbyXADB
objectClass = javax.sql.DataSource
osgi.jndi.service.name = jdbc/derbyXADB
osgi.service.blueprint.compname = derbyXADataSource
service.id = 424
----
...

The following OSGi services are exposed:

  • An OSGi service with interface javax.sql.XADataSource and datasource.name equal to derbyXADB—this is the XA data source explicitly exported as an OSGi service in Example 8.2.

  • An OSGi service with interface javax.sql.DataSource and datasource.name equal to derbyXADB—this is the auto-enlisting data source proxy implicitly created by the Aries wrapper service. The data source proxy copies the user-defined service properties from the original OSGi service and adds the setting aries.xa.aware = true. The aries.xa.aware property enables you to distinguish between the generated proxy and the original data source.

The tx:transaction element supports the following attributes:

bean

(Top-level only) Specifies a list of bean IDs (comma or space separated) to which the transaction policy applies. For example:

<blueprint ...>
    <tx:transaction bean="accountFoo,accountBar" value="..."/>
</blueprint>

You can also use the wildcard character, *, which may appear at most once in each list entry. For example:

<blueprint ...>
    <tx:transaction bean="account*,jms*" value="..."/>
</blueprint>

If the bean attribute is omitted, it defaults to * (matching all non-synthetic beans in the blueprint file).

method

(Top-level and bean-level) Specifies a list of method names (comma or space separated) to which the transaction policy applies. For example:

<bean id="accountFoo" class="org.fusesource.example.Account">
    <tx:transaction method="debit,credit,transfer" value="Required"/>
    <property name="accountName" value="Foo"/>
</bean>

You can also use the wildcard character, *, which may appear at most once in each list entry.

If the method attribute is omitted, it defaults to * (matching all methods in the applicable beans).

value

(Top-level and bean-level) Specifies the transaction policy. The policy values have the same semantics as the policies defined in the EJB 3.0 specification, as follows:

Derby provides a variety of different data source implementations, as described in Derby data sources. For XA transactions, there are two alternatives: EmbeddedXADataSource (where the database instance runs in the same JVM) and ClientXADataSource (where the application connects to a remote database instance). The current example uses EmbeddedXADataSource.

Perform the following steps to define a Derby datasource packaged in an OSGi bundle:

  1. Use the quickstart archetype to create a basic Maven project. Maven provides archetypes, which serve as templates for creating new projects. The Maven quickstart archetype is a basic archetype, providing the bare outline of a new Maven project.

    To create a new project for the Derby datasource bundle, invoke the Maven archetype plug-in as follows. Open a new command prompt, change directory to a convenient location (that is, to the directory where you will store your Maven projects), and enter the following command:

     mvn archetype:create
    -DarchetypeArtifactId=maven-archetype-quickstart
    -DgroupId=org.fusesource.example
    -DartifactId=derby-ds
    [Note]Note

    The preceding command parameters are shown on separate lines for ease of reading. You must enter the entire command on a single line, however.

    After downloading the requisite dependencies to run the quickstart archetype, the command creates a new Maven project for the org.fusesource.example/derby-ds artifact under the derby-ds directory.

  2. Change the project packaging type to bundle. Under the derby-ds directory, open the pom.xml file with a text editor and change the contents of the packaging element from jar to bundle, as shown in the following highlighted line:

    <project ...>
      ...
      <groupId>org.fusesource.example</groupId>
      <artifactId>derby-ds</artifactId>
      <version>1.0-SNAPSHOT</version>
      <packaging>bundle</packaging>
      ...
    </project>
  3. Add the bundle configuration to the POM. In the pom.xml file, add the following build element as a child of the project element:

    <project ...>
      ...
      <build>
        <defaultGoal>install</defaultGoal>
    
        <plugins>
    
          <plugin>
            <groupId>org.apache.felix</groupId>
            <artifactId>maven-bundle-plugin</artifactId>
            <extensions>true</extensions>
            <configuration>
              <instructions>
                <Bundle-SymbolicName>${project.groupId}.${project.artifactId}</Bundle-SymbolicName>
              </instructions>
            </configuration>
          </plugin>
          
        </plugins>
      </build>
    
    </project>
  4. Customize the Maven compiler plug-in to enforce JDK 1.6 coding syntax. In the pom.xml file, add the following plugin element as a child of the plugins element, to configure the Maven compiler plug-in:

    <project ...>
      ...
      <build>
        <defaultGoal>install</defaultGoal>
    
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
              <source>1.6</source>
              <target>1.6</target>
            </configuration>
          </plugin>
          ...
        </plugins>
      </build>
    
    </project>
  5. Add the Derby dependency to the POM (this is the only Maven dependency that is needed in this project). In the pom.xml file, add the following elements as children of the project element:

    <project ...>
      ...
      <properties>
        <derby-version>10.8.2.2</derby-version>
      </properties>
    
      <dependencies>
        <!-- Database dependencies -->
        <dependency>
          <groupId>org.apache.derby</groupId>
          <artifactId>derby</artifactId>
          <version>${derby-version}</version>
        </dependency>
        
      </dependencies>
      ...
    </project>

    Customize the derby-version property to the version of Derby you are using.

  6. Instantiate the Derby database instance and export the datasource as an OSGi service. In fact, this example exports two datasources: an XA datasource and a non-transactional datasource. The Derby datasources are exported using a blueprint XML file, which must be stored in the standard location, OSGI-INF/blueprint/. Under the derby-ds project directory, create the dataSource.xml blueprint file in the following location:

    src/main/resources/OSGI-INF/blueprint/dataSource.xml

    Using your favorite text editor, add the following contents to the dataSource.xml file:

    In the definition of the derbyXADataSource bean, the databaseName property identifies the database instance that is created (in this case, txXaTutorial). The service element then exports the XA datasource as an OSGi service with the interface, javax.sql.XADataSource. The datasource.name service property makes it possible to identify this datasource unambiguously, when it is referenced from other bundles.

  7. To build the derby-ds bundle and install it in the local Maven repository, enter the following Maven command from the derby-ds directory:

    mvn install

This section describes how to create a transactional route and package it as an OSGi bundle. The route described here is based on the AccountService class (see Appendix A), implementing a transfer of funds from one account to another, where the account data is stored in an Apache Derby database instance.

Perform the following steps to define a route that uses XA to coordinate global transactions across a JMS XA resource and an Apache Derby XA resource:

  1. Use the quickstart archetype to create a basic Maven project for the route bundle. Open a new command prompt, change directory to a convenient location, and enter the following command:

     mvn archetype:create
    -DarchetypeArtifactId=maven-archetype-quickstart
    -DgroupId=org.fusesource.example
    -DartifactId=tx-xa

    The preceding command creates a new Maven project for the org.fusesource.example/tx-xa artifact under the tx-xa directory.

  2. Change the project packaging type to bundle. Under the tx-xa directory, open the pom.xml file with a text editor and change the contents of the packaging element from jar to bundle, as shown in the following highlighted line:

    <project ...>
      ...
      <groupId>org.fusesource.example</groupId>
      <artifactId>tx-xa</artifactId>
      <version>1.0-SNAPSHOT</version>
      <packaging>bundle</packaging>
      ...
    </project>
  3. Add the bundle configuration to the POM. In the pom.xml file, add the following build element as a child of the project element:

    <project ...>
      ...
      <build>
        <defaultGoal>install</defaultGoal>
    
        <plugins>
    
          <plugin>
            <groupId>org.apache.felix</groupId>
            <artifactId>maven-bundle-plugin</artifactId>
            <extensions>true</extensions>
            <configuration>
              <instructions>
                <Bundle-SymbolicName>${project.groupId}.${project.artifactId}</Bundle-SymbolicName>
                <Import-Package>
                  org.springframework.core,
                  org.apache.camel,
                  org.apache.camel.component.jms,
                  org.apache.activemq,
                  org.apache.activemq.xbean,
                  org.apache.activemq.pool,
                  org.apache.xbean.spring,
                  org.apache.commons.pool,
                  org.hsqldb,
                  *
                </Import-Package>
                <Private-Package>
                  com.fusesource.demo.*
                </Private-Package>
                <DynamicImport-Package>
                  org.apache.activemq.*
                </DynamicImport-Package>
              </instructions>
            </configuration>
          </plugin>
          
        </plugins>
      </build>
    
    </project>
  4. Customize the Maven compiler plug-in to enforce JDK 1.6 coding syntax. In the pom.xml file, add the following plugin element as a child of the plugins element, to configure the Maven compiler plug-in:

    <project ...>
      ...
      <build>
        <defaultGoal>install</defaultGoal>
    
        <plugins>
    
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
              <source>1.6</source>
              <target>1.6</target>
            </configuration>
          </plugin>
          ...
        </plugins>
      </build>
    
    </project>
  5. Add the requisite dependencies to the POM. In the pom.xml file, add the following elements as children of the project element:

    <project ...>
      ...
      <name>Global transactions demo</name>
      <url>fusesource.com</url>
    
      <properties>
        <camel-version>2.8.0-fuse-00-08</camel-version>
        <activemq-version>5.5.1-fuse-00-08</activemq-version>
        <log4j-version>1.2.16</log4j-version>
        <slf4j-version>1.6.4</slf4j-version>
        <spring-version>3.0.5.RELEASE</spring-version>
        <derby-version>10.8.2.2</derby-version>
        <xbean-spring-version>3.5</xbean-spring-version>
        <junit-version>4.8.1</junit-version>
        <aries-version>0.3</aries-version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-core</artifactId>
          <version>${camel-version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-spring</artifactId>
          <version>${camel-version}</version>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>${slf4j-version}</version>
        </dependency>
        <dependency>
          <groupId>log4j</groupId> 
          <artifactId>log4j</artifactId> 
          <version>${log4j-version}</version> 
        </dependency>
        
        <!-- Spring transaction dependencies -->
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-core</artifactId>
          <version>${spring-version}</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-context</artifactId>
          <version>${spring-version}</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-tx</artifactId>
          <version>${spring-version}</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-aop</artifactId>
          <version>${spring-version}</version>
        </dependency>
        
        <!-- Spring JDBC adapter -->
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jdbc</artifactId>
          <version>${spring-version}</version>
        </dependency>
    
        <!-- Database dependencies -->
        <dependency>
          <groupId>org.apache.derby</groupId>
          <artifactId>derby</artifactId>
          <version>${derby-version}</version>
        </dependency>
    
        <!-- JMS/ActiveMQ artifacts -->
        <dependency> 
          <groupId>org.apache.camel</groupId> 
          <artifactId>camel-jms</artifactId> 
          <version>${camel-version}</version>
        </dependency> 
        <dependency> 
          <groupId>org.apache.activemq</groupId> 
          <artifactId>activemq-camel</artifactId> 
          <version>${activemq-version}</version> 
        </dependency>
        <!--  This is needed by the camel-jms component -->
        <dependency> 
          <groupId>org.apache.xbean</groupId> 
          <artifactId>xbean-spring</artifactId> 
          <version>${xbean-spring-version}</version> 
        </dependency>
        
      </dependencies>
      ...
    </project>

    Customize the dependency versions as required (by editing the properties defined in the properties element).

  6. Define the AccountService class. Under the tx-xa project directory, create the following directory:

    src/main/java/org/fusesource/example/tx/xa

    Create the file, AccountService.java, in this directory and add the contents of the listing from Example A.1 to this file.

  7. Define the beans and resources needed by the route in a Spring XML file. Under the tx-xa project directory, create the following directory:

    src/main/resources/META-INF/spring

    Using a text editor, create the file, beans.xml, in this directory and add the following contents to the file:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:amq="http://activemq.apache.org/schema/core"
           xmlns:osgi="http://www.springframework.org/schema/osgi"
           xsi:schemaLocation="
       http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring
           http://camel.apache.org/schema/spring/camel-spring.xsd
       http://activemq.apache.org/schema/core
           http://activemq.apache.org/schema/core/activemq-core.xsd
       http://www.springframework.org/schema/osgi  
           http://www.springframework.org/schema/osgi/spring-osgi.xsd    
    ">
        
        <!-- Local ActiveMQ broker instance -->
        <amq:broker brokerName="TxXaDemo">
            <amq:persistenceAdapter>
                <amq:kahaDB directory="txXaDemo-data"/>
            </amq:persistenceAdapter>
            <amq:transportConnectors>
                <amq:transportConnector name="openwire" uri="tcp://localhost:51616"/>
                <amq:transportConnector name="vm" uri="vm:local"/>
            </amq:transportConnectors>
        </amq:broker>      
    
        
        <!--
            JMS non-TX endpoint configuration
        -->
        <bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> 
            <property name="configuration" ref="jmsConfig" /> 
        </bean> 
        
        <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
            <property name="connectionFactory" ref="jmsPoolConnectionFactory"/> 
        </bean>
        
        <!-- connection factory wrapper to support pooling -->
        <bean id="jmsPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
            <property name="connectionFactory" ref="jmsConnectionFactory" />
        </bean>
        
        <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="vm:local"/>
        </bean>
    
    
        <!--
            OSGi TM Service
        -->
        <!-- access through Spring's PlatformTransactionManager -->
        <osgi:reference id="osgiPlatformTransactionManager"
                        interface="org.springframework.transaction.PlatformTransactionManager"/>
        <!-- access through PlatformTransactionManager -->
        <osgi:reference id="osgiJtaTransactionManager"
                        interface="javax.transaction.TransactionManager"/>
        
        <!--
            JMS TX endpoint configuration
        -->
        <bean id="jmstx" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
            <property name="configuration" ref="jmsTxConfig" /> 
        </bean> 
        
        <bean id="jmsTxConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
            <property name="connectionFactory" ref="jmsXaPoolConnectionFactory"/> 
            <property name="transactionManager" ref="osgiPlatformTransactionManager"/> 
            <property name="transacted" value="false"/>
            <property name="cacheLevelName" value="CACHE_CONNECTION"/>
        </bean> 
        
        <!-- connection factory wrapper to support auto-enlisting of XA resource -->
        <bean id="jmsXaPoolConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory">
            <property name="maxConnections" value="1" />
            <property name="connectionFactory" ref="jmsXaConnectionFactory" />
            <property name="transactionManager" ref="osgiJtaTransactionManager" />
        </bean>
        
        <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
            <property name="brokerURL" value="vm:local"/>
            <property name="redeliveryPolicy">
                <bean class="org.apache.activemq.RedeliveryPolicy">
                    <property name="maximumRedeliveries" value="0"/>
                </bean>
            </property>
        </bean>
        
        <!--
            ActiveMQ XA Resource Manager
        -->
        <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
            <property name="transactionManager" ref="osgiJtaTransactionManager" />
            <property name="connectionFactory" ref="jmsXaPoolConnectionFactory" />
            <property name="resourceName" value="activemq.default" />
        </bean>
        
        <!--
            Import Derby data sources as OSGi services
        -->
        <osgi:reference id="derbyXADataSource"
                        interface="javax.sql.DataSource"
                        filter="(datasource.name=derbyXADB)"/>
        <osgi:reference id="derbyDataSource"
                        interface="javax.sql.DataSource"
                        filter="(datasource.name=derbyDB)"/>
        
    
        <!-- bean for account business logic -->
        <bean id="accountService" class="com.fusesource.demo.tx.xa.AccountService">
            <property name="dataSource" ref="derbyXADataSource"/>
        </bean>
    
    </beans>
  8. Define the transactional route. In the src/main/resources/META-INF/spring directory, create the new file, camelContext.xml, and add the following contents:

    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    
      <camelContext xmlns="http://camel.apache.org/schema/spring" trace="false">
        <!-- Transactional route -->
        <route>
          <from uri="jmstx:queue:giro"/>
          <bean ref="accountService" method="credit"/>
          <bean ref="accountService" method="debit"/>
          <bean ref="accountService" method="dumpTable"/>
          <to uri="jmstx:queue:statusLog"/>
        </route>
        
        <!-- Feeder route -->
        <route>
          <from uri="file:PathNameToMsgDir"/>
          <to uri="jms:queue:giro"/>
        </route>
      </camelContext>
    
    </beans>
    [Important]Important

    Replace PathNameToMsgDir with the absolute path name of a temporary directory. When the application is running, you will use this directory as a convenient way of feeding XML messages into the route.

  9. To build the tx-xa bundle and install it in the local Maven repository, enter the following Maven command from the tx-xa directory:

    mvn install

Perform the following steps to deploy and run the transactional route in the Fuse ESB Enterprise OSGi container:

  1. Create the Derby database instance for the tutorial and create the accounts table, as follows:

  2. Open a new command prompt and start the Fuse ESB Enterprise OSGi container by entering the following command:

    servicemix
  3. Install the transaction feature into the OSGi container. Enter the following console command:

    karaf@root> features:install transaction
  4. Install the required bundle dependencies into the OSGi container. For this tutorial, you require the derby bundle and the spring-jdbc bundle. To install them, enter the following console commands:

    karaf@root> install mvn:org.springframework/spring-jdbc/3.0.5.RELEASE
    
    karaf@root> install mvn:org.apache.derby/derby/10.8.2.2
  5. Install and start the derby-ds bundle (assuming that you have already built the bundle, as described in Define a Derby Datasource) by entering the following console command:

    karaf@root> install -s mvn:org.fusesource.example/derby-ds/1.0-SNAPSHOT
  6. To check that the datasources have been successfully exported from the derby-ds bundle, list the derby-ds services using the osgi:ls command. For example, given that BundleID is the bundle ID for the derby-ds bundle, you would enter the following console command:

    karaf@root> osgi:ls BundleID

    Amongst the exported services, you should see an entry like the following:

    ----
    aries.xa.aware = true
    datasource.name = derbyXADB
    objectClass = javax.sql.DataSource
    osgi.service.blueprint.compname = derbyXADataSource
    service.id = 424
    ----

    This is the wrapped XA datasource (recognizable from the aries.xa.aware = true setting), which is automatically created by the Aries wrapper feature (see Apache Aries Auto-Enlisting XA Wrapper).

  7. Install and start the tx-xa bundle, by entering the following console command:

    karaf@root> install -s mvn:org.fusesource.example/tx-xa
  8. Create a file called giro1.xml in any convenient directory and use a text editor to add the following message contents to it:

    <transaction>
      <transfer>
        <sender>Major Clanger</sender>
        <receiver>Tiny Clanger</receiver>
        <amount>90</amount>
      </transfer>
    </transaction>

    Now copy giro1.xml into the PathNameToMsgDir directory you created earlier (see Define a Transactional Route). The giro1.xml file should disappear immediately after it is copied, because the PathNameToMsgDir is being monitored by the feeder route.

  9. Use the JMX console to see what has happened to the message in giro1.xml. Open a new command prompt and enter the following command (provided with the standard JDK):

    jconsole

    In the JConsole: New Connection screen, select the org.apache.karaf.main.Main local process and click Connect.

  10. Click on the MBean tab and use the tree in the left-hand column to navigate to org.apache.activemq | TxXaDemo | Queue | statusLog | Operations, as shown in the following screenshot.


  11. Now in the right hand pane of the JConsole window, click browseMessages (the version of the method that takes no arguments). The Operation return value dialog pops up as shown , which enables you to browse all of the messages currently pending in the statusLog queueue.


    In the example shown here, there is just one message in the queue. If you scroll down, you can see the Text field, which holds the body of the JMS message. The body contains the most recent result of calling the AccountService.dumpTable() method (which is called in the last step of the transactional route).

  12. You can also force a transaction rollback by sending a message that exceeds the AccountService.debit() limit (withdrawal limit) of 100. For example, create the file giro2.xml and add the following message contents to it:

    When you copy this file into the PathNameToMsgDir directory, the message never propagates through to the statusLog queue, because the transaction gets rolled back.

Example A.1 shows the complete listing of the AccountService class, which uses the Spring JdbcTemplate class to access a JDBC data source.

Example A.1. The AccountService Class

// Java
package org.fusesource.example.tx.xa;

import java.util.List;

import javax.sql.DataSource;


import org.apache.camel.Exchange;
import org.apache.camel.language.XPath;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;


public class AccountService {
    private static Logger log = Logger.getLogger(AccountService.class);
    private JdbcTemplate jdbc;

    public AccountService() {
    }

    public void setDataSource(DataSource ds) {
        jdbc = new JdbcTemplate(ds);
    }

    public void credit(
            @XPath("/transaction/transfer/receiver/text()") String name,
            @XPath("/transaction/transfer/amount/text()") String amount
            )
    {
        log.info("credit() called with args name = " + name + " and amount = " + amount);
        int origAmount = jdbc.queryForInt(
                "select amount from accounts where name = ?",
                new Object[]{name}
        );
        int newAmount = origAmount + Integer.parseInt(amount);
        
        jdbc.update(
                "update accounts set amount = ? where name = ?",
                new Object[] {newAmount, name}
        );
    }
    
    public void debit(
            @XPath("/transaction/transfer/sender/text()") String name,
            @XPath("/transaction/transfer/amount/text()") String amount
            )
    {
        log.info("debit() called with args name = " + name + " and amount = " + amount);
        int iamount = Integer.parseInt(amount);
        if (iamount > 100) {
            throw new IllegalArgumentException("Debit limit is 100");
        }
        int origAmount = jdbc.queryForInt(
                "select amount from accounts where name = ?",
                new Object[]{name}
        );
        int newAmount = origAmount - Integer.parseInt(amount);
        if (newAmount < 0) {
            throw new IllegalArgumentException("Not enough in account");
        }
        
        jdbc.update(
                "update accounts set amount = ? where name = ?",
                new Object[] {newAmount, name}
        );
    }
    
    public void dumpTable(Exchange ex) {
        log.info("dump() called");
        List<?> dump = jdbc.queryForList("select * from accounts");
        ex.getIn().setBody(dump.toString());
    }
}