323.11. JDBC ベースの集約リポジトリーの使用
Camel 2.6 以降で利用可能
Camel 2.6 で JdbcAggregationRepository を使用する
Camel 2.6 では、JdbcAggregationRepository が camel-jdbc-aggregator
コンポーネントで提供されます。Camel 2.7 以降では、JdbcAggregationRepository
が camel-sql
コンポーネントで提供されます。
JdbcAggregationRepository
は AggregationRepository
であり、その場で集約されたメッセージを永続化します。デフォルトのアグリゲーターはメモリー内のみの AggregationRepository
を使用するため、これによりメッセージが失われないことが保証されます。JdbcAggregationRepository
を使用すると、Camel と一緒に Aggregator の永続的なサポートを提供できます。
エクスチェンジが正常に処理された場合にのみ、AggregationRepository
で confirm
メソッドが呼び出されたときに完了としてマークされます。これは、同じ Exchange が再び失敗すると、成功するまで再試行されることを意味します。
オプション maximumRedeliveries
を使用して、復元された特定の Exchange の再配信試行の最大回数を制限できます。maximumRedeliveries
に達したときに Camel がエクスチェンジの送信先を認識できるように、deadLetterUri
オプションも設定する必要があります。
このテスト など、camel-sql の単体テストでいくつかの例を確認できます。
323.11.1. データベース
操作可能にするために、各アグリゲーターは 2 つのテーブルを使用します: アグリゲーションと完了したテーブルです。慣例により、完了したものは、"_COMPLETED"
という接尾辞が付いた集約と同じ名前になります。この名前は、Spring Bean で RepositoryName
プロパティーを使用して設定する必要があります。次の例では、集約が使用されます。
両方のテーブルのテーブル構造定義は同一です。どちらの場合も、文字列値がキー (id) として使用されますが、ブロブにはバイト配列でシリアル化されたエクスチェンジが含まれます。
ただし、1 つの違いを覚えておく必要があります。テーブルによっては、id フィールドの内容が同じではありません。
集約テーブルの id は、コンポーネントがメッセージを集約するために使用する相関 ID を保持します。完成したテーブルの id には、対応する ブロブフィールドに格納されているエクスチェンジの ID が保持されます。
テーブルの作成に使用される SQL クエリーを次に示します。aggregation
をアグリゲーターリポジトリー名に置き換えてください。
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );
323.11.2. 本文とヘッダーをテキストとして保存する
Camel 2.11 から利用可能
JdbcAggregationRepository
を設定して、メッセージ本文と select (ed) ヘッダーを文字列として別々の列に格納できます。たとえば、本文を保存するには、次の 2 つのヘッダー companyName
と accountName
は次の SQL を使用します。
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );
次に、以下に示すように、この動作を有効にするようにリポジトリーを設定します。
<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean>
323.11.3. コーデック (シリアル化)
あらゆるタイプのペイロードを含めることができるため、エクスチェンジは設計上シリアライズできません。データベースのブロブフィールドに格納されるバイト配列に変換されます。これらの変換はすべて JdbcCodec
クラスによって処理されます。コードの 1 つの詳細に注意する必要があります: ClassLoadingAwareObjectInputStream
です。
ClassLoadingAwareObjectInputStream
は、Apache ActiveMQ プロジェクトから再利用されています。ObjectInputStream
をラップし、currentThread
ではなく ContextClassLoader
で使用します。利点は、他のバンドルによって公開されたクラスをロードできることです。これにより、交換の本文とヘッダーにカスタム型のオブジェクト参照を含めることができます。
323.11.4. Transaction
トランザクションを調整するには、Spring PlatformTransactionManager
が必要です。
323.11.4.1. Service (Start/Stop)
start
メソッドは、データベースの接続と必要なテーブルの存在を確認します。何か問題があると、起動時に失敗します。
323.11.5. アグリゲーターの設定
対象となる環境によっては、アグリゲーターに何らかの設定が必要になる場合があります。ご存知のように、各アグリゲーターには独自のリポジトリー (対応するテーブルのペアがデータベースに作成されている) とデータソースが必要です。デフォルトの lobHandler がデータベースシステムに適合していない場合は、lobHandler
プロパティーを挿入できます。
Oracle の宣言は次のとおりです。
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>
323.11.6. Optimistic locking
Camel 2.12 以降では、optimisticLocking
をオンにして、複数の Camel アプリケーションが集約リポジトリー用の同じデータベースを共有するクラスター化された環境で、この JDBC ベースの集約リポジトリーを使用できます。競合状態がある場合、JDBC ドライバーは、JdbcAggregationRepository
が対応できるベンダー固有の例外を出力します。JDBC ドライバーからの例外の原因が楽観的ロックエラーと見なされるかを知るには、これを行うマッパーが必要です。したがって、org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
があり、必要に応じてカスタムロジックを実装できます。次のように動作するデフォルトの実装 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper
があります。
次のチェックが行われます。
-
原因となった例外が
SQLException
の場合、SQLState が 23 で始まるかどうかがチェックされます。 -
原因となった例外が
DataIntegrityViolationException
の場合 - 発生した例外クラス名に ConstraintViolation が含まれる場合。
- クラス名が設定されている場合、FQN クラス名の一致をオプションでチェックします
さらに、FQN クラス名を追加できます。発生した例外 (またはネストされた例外) のいずれかが FQN クラス名のいずれかと等しい場合は、楽観的ロックエラーになります。
以下は、JDBC ベンダーからの 2 つの追加の FQN クラス名を定義する例です。
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra FQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean>