第9章 トランザクションを使用する Camel アプリケーションの作成

参照可能な 3 つのタイプのサービスを設定すると、アプリケーションを作成する準備が整います。以下の 3 つのタイプのサービスがあります。

  • 以下のインターフェイスのいずれかを実装する 1 つのトランザクションマネージャー。

    • javax.transaction.UserTransaction
    • javax.transaction.TransactionManager
    • org.springframework.transaction.PlatformTransactionManager
  • javax.sql.DataSource. インターフェイスを実装する JDBC データソースが少なくとも 1 つ。多くの場合、複数のデータソースが存在します。
  • javax.jms.ConnectionFactory インターフェイスを実装する JMS 接続ファクトリーが少なくとも 1 つ。多くの場合、複数の値が存在します。

ここでは、トランザクション、データソース、および接続ファクトリーの管理に関連する Camel 固有の設定を説明します。

注記

ここでは、SpringTransactionPolicy などの Spring 関連の概念をいくつか説明します。SpringXMLDSLBlueprintXMLDSL には明確な違いがあり、どちらも Camel コンテキストを定義する XML 言語です。Spring XML DSL は Fuse で 非推奨 になりました。ただし、Camel トランザクションメカニズムは引き続き内部で Spring ライブラリーを使用します。

ここでの情報のほとんどは、使用される PlatformTransactionManager の種類に依存していません。PlatformTransactionManager が Narayana トランザクションマネージャーの場合、完全な JTA トランザクションが使用されます。PlatformTransactionManager がローカルの Blueprint <bean> として定義されている場合 (例:org.springframework.jms.connection.JmsTransactionManager)、ローカルトランザクションが使用されます。

トランザクション境界とは、トランザクションを開始、コミット、およびロールバックする手順を指します。本セクションでは、プログラミングと設定の両方でトランザクション境界を制御することができるメカニズムを説明します。

9.1. ルートをマークすることによるトランザクションの境界

Apache Camel は、ルートでトランザクションを開始する簡単なメカニズムを提供します。Java DSL で transacted() コマンドを挿入するか、XML DSL で <transacted/> タグを挿入します。

図9.1 ルートをマークすることによる境界

txn demarcation 01

トランザクション処理されたプロセッサーは、トランザクションを次のように区切ります。

  1. 交換がトランザクションプロセッサーに入ると、トランザクションプロセッサーはデフォルトのトランザクションマネージャーを呼び出し、トランザクションを開始して、現在のスレッドにトランザクションをアタッチします。
  2. エクスチェンジが残りのルートの最後に到達すると、処理されたプロセッサーはトランザクションマネージャーを呼び出して現在のトランザクションをコミットします。

9.1.1. JDBC リソースを使用したルートのサンプル

図9.1「ルートをマークすることによる境界」 は、ルートに transacted() DSL コマンドを追加することで、トランザクションになるルートの例を示しています。transacted() ノードに続くすべてのルートノードはトランザクションスコープに含まれます。この例では、以下の 2 つのノードが JDBC リソースにアクセスします。

9.1.2. Java DSL でのルート定義

以下の Java DSL の例は、transacted() DSL コマンドでルートをマークしてトランザクションルートを定義する方法を示しています。

import org.apache.camel.builder.RouteBuilder;

class MyRouteBuilder extends RouteBuilder {
    public void configure() {
        from("file:src/data?noop=true")
                .transacted()
                .bean("accountService","credit")
                .bean("accountService","debit");
    }
}

この例では、ファイルエンドポイントは、あるアカウントから別のアカウントへの資金の移動を記述するいくつかの XML 形式のファイルを読み取ります。最初の bean() 呼び出しでは、指定された金額が受取人の口座に加算され、2 回目の bean() 呼び出しでは、支払人の口座からから指定の金額が差し引かれます。どちらの bean() 呼び出しでも、データベースリソースに対する更新が行われます。データベースリソースがトランザクションマネージャーを介してトランザクションにバインドされることを前提とします (例:6章JDBC データソースの使用)。

9.1.3. Blueprint XML でのルート定義

上記のルートは Blueprint XML で表現することもできます。<transacted /> タグは、以下の XML のようにルートをトランザクションとしてマークします。

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ...>

    <camelContext xmlns="http://camel.apache.org/schema/blueprint">
        <route>
            <from uri="file:src/data?noop=true" />
            <transacted />
            <bean ref="accountService" method="credit" />
            <bean ref="accountService" method="debit" />
        </route>
    </camelContext>

</blueprint>

9.1.4. デフォルトのトランザクションマネージャーおよびトランザクションポリシー

トランザクションを区別するには、トランザクションプロセッサーを特定のトランザクションマネージャーインスタンスに関連付ける必要があります。transacted() を呼び出すたびにトランザクションマネージャーを指定しなくても、トランザクションプロセッサーが適切なデフォルトを自動的に選択します。たとえば、設定にトランザクションマネージャーのインスタンスが 1 つしかない場合、transacted プロセッサーはこのトランザクションマネージャーを暗黙的に選択し、トランザクションを区別するために使用されます。

トランザクションプロセッサーは、TransactedPolicy タイプのトランザクションポリシーで設定することもできます。これは、伝播ポリシーとトランザクションマネージャーがカプセル化します (詳細は 「トランザクション伝播ポリシー」 を参照してください)。デフォルトのトランザクションマネージャーまたはトランザクションポリシーを選択するには、以下のルールが使用されます。

  1. org.apache.camel.spi.TransactedPolicy タイプの Bean が 1 つしかない場合は、この Bean を使用します。

    注記

    TransactedPolicy タイプは、「トランザクション伝播ポリシー」 で説明されている SpringTransactionPolicy 型のベースタイプです。そのため、ここで参照される Bean は SpringTransactionPolicy Bean である可能性があります。

  2. IDPROPAGATION_REQUIRED が含まれる、org.apache.camel.spi.TransactedPolicy の Bean タイプがある場合には、この Bean を使用します。
  3. org.springframework.transaction.PlatformTransactionManager タイプの Bean が 1 つしかない場合は、この Bean を使用します。

Bean ID を transacted() に引数として指定することで、Bean を明示的に指定するオプションもあります。「Java DSL の PROPAGATION_NEVER ポリシーを使用したルートの例」を参照してください。

9.1.5. トランザクションスコープ

トランザクションプロセッサーをルートに挿入すると、トランザクションマネージャーは、エクスチェンジがこのノードを通過するたびに新しいトランザクションを作成します。トランザクションのスコープは以下のように定義されます。

  • トランザクションは現在のスレッドにのみ関連付けられます。
  • トランザクションスコープには、トランザクションプロセッサーに続くすべてのルートノードが含まれます。

トランザクションプロセッサーの前にあるルートノードはトランザクションには含まれません。ただし、ルートがトランザクションエンドポイントで始まる場合は、ルートのすべてのノードがトランザクションに含まれます。「ルート開始時のトランザクションエンドポイント」を参照してください。

以下のルートを見てみましょう。transacted() DSL コマンドは、データベースリソースにアクセスする最初の bean() 呼び出しの後に誤って発生するため、正しくありません。

// Java
import org.apache.camel.builder.RouteBuilder;

public class MyRouteBuilder extends RouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
                .bean("accountService", "credit")
                .transacted()  // <-- WARNING: Transaction started in the wrong place!
                .bean("accountService", "debit");
    }
}

9.1.6. トランザクションルートにスレッドプールがない

特定のトランザクションが現在のスレッドにのみ関連付けられていることを理解することが重要です。新しいスレッドでの処理は現在のトランザクションに参加しないため、トランザクションルートの途中でスレッドプールを作成しないでください。たとえば、次のルートは問題を引き起こす可能性があります。

// Java
import org.apache.camel.builder.RouteBuilder;

public class MyRouteBuilder extends RouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
                .transacted()
                .threads(3)  // WARNING: Subthreads are not in transaction scope!
                .bean("accountService", "credit")
                .bean("accountService", "debit");
    }
}

threads() DSL コマンドはトランザクションルートと互換性がないため、前述のルートなどのルートは、データベースが破損する可能性があります。threads() 呼び出しが transacted() 呼び出しの前であっても、ルートは想定どおりに動作しません。

9.1.7. フラグメントへのルートの分割

ルートをフラグメントに分割し、各ルートのフラグメントを現在のトランザクションに参加させる場合は、direct: エンドポイントを使用することができます。たとえば、エクスチェンジを個別のルートフラグメントに送信するには、転送量が大きい (100 を超える) または小さい (100 以下) かに応じて以下のように choice() DSL コマンドとダイレクトエンドポイントを使用できます。

// Java
import org.apache.camel.builder.RouteBuilder;

public class MyRouteBuilder extends RouteBuilder {
    ...
    public void configure() {
        from("file:src/data?noop=true")
                .transacted()
                .bean("accountService", "credit")
                .choice().when(xpath("/transaction/transfer[amount > 100]"))
                .to("direct:txbig")
                .otherwise()
                .to("direct:txsmall");

        from("direct:txbig")
                .bean("accountService", "debit")
                .bean("accountService", "dumpTable")
                .to("file:target/messages/big");

        from("direct:txsmall")
                .bean("accountService", "debit")
                .bean("accountService", "dumpTable")
                .to("file:target/messages/small");
    }
}

direct エンドポイントは同期されているため、direct:txbig で始まるフラグメントと direct:txsmall で始まるフラグメントの両方は、現在のトランザクションに参加します。つまり、フラグメントは最初のルートフラグメントと同じスレッドで実行されるので、同じトランザクションスコープに含まれています。

注記

ルートのフラグメントに参加するために seda エンドポイントを使用することはできません。seda コンシューマーエンドポイントは、ルートフラグメント (非同期処理) を実行する新しいスレッドを作成します。したがって、フラグメントは元のトランザクションには参加しません。

9.1.8. リソースエンドポイント

以下の Apache Camel コンポーネントは、ルートの宛先として表示される場合にリソースエンドポイントとして機能します。たとえば、to() DSL コマンドに表示される場合です。つまり、これらのエンドポイントはデータベースや永続キューなどのトランザクションリソースにアクセスできます。リソースエンドポイントは、現在のトランザクションを開始したトランザクションプロセッサーと同じトランザクションマネージャーに関連付けられている限り、現在のトランザクションに参加できます。

  • ActiveMQ
  • AMQP
  • Hibernate
  • iBatis
  • JavaSpace
  • JBI
  • JCR
  • JDBC
  • JMS
  • JPA
  • LDAP

9.1.9. リソースエンドポイントを含むサンプルルート

以下の例は、リソースエンドポイントを含むルートを示しています。これにより、送金の注文が 2 つの異なる JMS キューに送信されます。credits キューは、受取人の口座に入金するために注文を処理します。debits キューは、送金人の口座から差し引く注文を処理します。対応するデビッドがある場合にのみ、クレジットがあります。したがって、キューへの追加操作を単一のトランザクションに含める必要があります。トランザクションが成功すると、クレジット注文とデビット注文の両方がキューに入れられます。エラーが発生した場合には、どちらの注文もキューに入れられません。

from("file:src/data?noop=true")
        .transacted()
        .to("jmstx:queue:credits")
        .to("jmstx:queue:debits");