Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 5. Transaction Demarcation

Abstract

Transaction demarcation refers to the procedures for starting, committing, and rolling back transactions. This chapter describes the mechanisms that are available for controlling transaction demarcation, both by programming and by configuration.

5.1. Demarcation by Marking the Route

Overview

Apache Camel provides a simple mechanism for initiating a transaction in a route, by inserting the transacted() command in the Java DSL or by inserting the <transacted/> tag in the XML DSL.

Sample route with JDBC resource

Figure 5.1, “Demarcation by Marking the Route” shows an example of a route that is made transactional by adding the transacted() DSL command to the route. All of the route nodes following the transacted() node are included in the transaction scope. In this example, the two following nodes access a JDBC resource.

Figure 5.1. Demarcation by Marking the Route

Demarcation by Marking the Route
The transacted processor demarcates transactions as follows: when an exchange enters the transacted processor, the transacted processor invokes the default transaction manager to begin a transaction (attaching it to the current thread); when the exchange reaches the end of the remaining route, the transacted processor invokes the transaction manager to commit the current transaction.

Route definition in Java DSL

The following Java DSL example shows how to define a transactional route by marking the route with the transacted() DSL command:
// Java
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
          .transacted()
          .beanRef("accountService","credit")
          .beanRef("accountService","debit");
    }
}
In this example, the file endpoint reads some files in XML format that describe a transfer of funds from one account to another. The first beanRef() invocation credits the specified sum of money to the beneficiary's account and then the second beanRef() invocation subtracts the specified sum of money from the sender's account. Both of the beanRef() invocations cause updates to be made to a database resource, which we are assuming is bound to the transaction through the transaction manager (for example, see Section 2.6.1, “JDBC Data Source”). For a sample implementation of the accountService bean, see Section 4.2, “Spring JDBC Template”.

Using SpringRouteBuilder

The beanRef() Java DSL command is available only in the SpringRouteBuilder class. It enables you to reference a bean by specifying the bean's Spring registry ID (for example, accountService). If you do not use the beanRef() command, you could inherit from the org.apache.camel.builder.RouteBuilder class instead.

Route definition in Spring XML

The preceding route can equivalently be expressed in Spring XML, where the <transacted/> tag is used to mark the route as transactional, as follows:
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ... >

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="file:src/data?noop=true"/>
      <transacted/>
      <bean ref="accountService" method="credit"/>
      <bean ref="accountService" method="debit"/>
    </route>
  </camelContext>

</beans>

Default transaction manager and transacted policy

To demarcate transactions, the transacted processor must be associated with a particular transaction manager instance. To save you having to specify the transaction manager every time you invoke transacted(), the transacted processor automatically picks a sensible default. For example, if there is only one instance of a transaction manager in your Spring configuration, the transacted processor implicitly picks this transaction manager and uses it to demarcate transactions.
A transacted processor can also be configured with a transacted policy, of TransactedPolicy type, which encapsulates a propagation policy and a transaction manager (see Section 5.3, “Propagation Policies” for details). The following rules are used to pick the default transaction manager or transaction policy:
  1. If there is only one bean of org.apache.camel.spi.TransactedPolicy type, use this bean.
    Note
    The TransactedPolicy type is a base type of the SpringTransactionPolicy type that is described in Section 5.3, “Propagation Policies”. Hence, the bean referred to here could be a SpringTransactionPolicy bean.
  2. If there is a bean of type, org.apache.camel.spi.TransactedPolicy, which has the ID, PROPAGATION_REQUIRED, use this bean.
  3. If there is only one bean of org.springframework.transaction.PlatformTransactionManager type, use this bean.
You also have the option of specifying a bean explicitly by providing the bean ID as an argument to transacted()—see the section called “Sample route with PROPAGATION_NEVER policy in Java DSL”.

Transaction scope

If you insert a transacted processor into a route, a new transaction is created each time an exchange passes through this node and the transaction's scope is defined as follows:
  1. The transaction is associated with the current thread only.
  2. The transaction scope encompasses all of the route nodes following the transacted processor.
In particular, all of the route nodes preceding the transacted processor are not included in the transaction (but the situation is different, if the route begins with a transactional endpoint—see Section 5.2, “Demarcation by Transactional Endpoints”). For example, the following route is incorrect, because the transacted() DSL command mistakenly appears after the first beanRef() call (which accesses the database resource):
// Java
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
          .beanRef("accountService","credit")
          .transacted()  // <-- WARNING: Transaction started in the wrong place!
          .beanRef("accountService","debit");
    }
}

No thread pools in a transactional route

It is crucial to understand that a given transaction is associated with the current thread only. It follows that you must not create a thread pool in the middle of a transactional route, because the processing in the new threads will not participate in the current transaction. For example, the following route is bound to cause problems:
// Java
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
          .transacted()
          .threads(3)  // WARNING: Subthreads are not in transaction scope!
          .beanRef("accountService","credit")
          .beanRef("accountService","debit");
    }
}
A route like the preceding one is certain to corrupt your database, because the threads() DSL command is incompatible with transacted routes. Even if the threads() call precedes the transacted() call, the route will not behave as expected.

Breaking a route into fragments

If you want to break a route into fragments and have each route fragment participate in the current transaction, you can use direct: endpoints. For example, to send exchanges to separate route fragments, depending on whether the transfer amount is big (greater than 100) or small (less than or equal to 100), you can use the choice() DSL command and direct endpoints, as follows:
// Java
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
            .transacted()
            .beanRef("accountService","credit")
            .choice().when(xpath("/transaction/transfer[amount > 100]"))
                .to("direct:txbig")
            .otherwise()
                .to("direct:txsmall");

        from("direct:txbig")
            .beanRef("accountService","debit")
            .beanRef("accountService","dumpTable")
            .to("file:target/messages/big");

        from("direct:txsmall")
            .beanRef("accountService","debit")
            .beanRef("accountService","dumpTable")
            .to("file:target/messages/small");
    }
}
Both the fragment beginning with direct:txbig and the fragment beginning with direct:txsmall participate in the current transaction, because the direct endpoints are synchronous. This means that the fragments execute in the same thread as the first route fragment and, therefore, they are included in the same transaction scope.
Note
You must not use seda endpoints to join the route fragments, because seda consumer endpoints create a new thread (or threads) to execute the route fragment (asynchronous processing). Hence, the fragments would not participate in the original transaction.

Resource endpoints

The following Apache Camel components act as resource endpoints when they appear as the destination of a route (for example, if they appear in the to() DSL command). That is, these endpoints can access a transactional resource, such as a database or a persistent queue. The resource endpoints can participate in the current transaction, as long as they are associated with the same transaction manager as the transacted processor that initiated the current transaction. If you need to access multiple resources, you must deploy your application in a J2EE container, which gives you access to a global transaction manager.

Sample route with resource endpoints

For example, the following route sends the order for a money transfer to two different JMS queues: the credits queue processes the order to credit the receiver's account; and the debits queue processes the order to debit the sender's account. Since there must only be a credit, if there is a corresponding debit, it makes sense to enclose the enqueueing operations in a single transaction. If the transaction succeeds, both the credit order and the debit order will be enqueued, but if an error occurs, neither order will be enqueued.
from("file:src/data?noop=true")
    .transacted()
    .to("jmstx:queue:credits")
    .to("jmstx:queue:debits");