323.11. JDBC ベースの集約リポジトリーの使用

Camel 2.6 以降で利用可能

注記

Camel 2.6 で JdbcAggregationRepository を使用する

Camel 2.6 では、JdbcAggregationRepository が camel-jdbc-aggregator コンポーネントで提供されます。Camel 2.7 以降では、JdbcAggregationRepositorycamel-sql コンポーネントで提供されます。

JdbcAggregationRepositoryAggregationRepository であり、その場で集約されたメッセージを永続化します。デフォルトのアグリゲーターはメモリー内のみの AggregationRepository を使用するため、これによりメッセージが失われないことが保証されます。JdbcAggregationRepository を使用すると、Camel と一緒に Aggregator の永続的なサポートを提供できます。

エクスチェンジが正常に処理された場合にのみ、AggregationRepositoryconfirm メソッドが呼び出されたときに完了としてマークされます。これは、同じ Exchange が再び失敗すると、成功するまで再試行されることを意味します。

オプション maximumRedeliveries を使用して、復元された特定の Exchange の再配信試行の最大回数を制限できます。maximumRedeliveries に達したときに Camel がエクスチェンジの送信先を認識できるように、deadLetterUri オプションも設定する必要があります。

このテスト など、camel-sql の単体テストでいくつかの例を確認できます。

323.11.1. データベース

操作可能にするために、各アグリゲーターは 2 つのテーブルを使用します: アグリゲーションと完了したテーブルです。慣例により、完了したものは、"_COMPLETED" という接尾辞が付いた集約と同じ名前になります。この名前は、Spring Bean で RepositoryName プロパティーを使用して設定する必要があります。次の例では、集約が使用されます。

両方のテーブルのテーブル構造定義は同一です。どちらの場合も、文字列値がキー (id) として使用されますが、ブロブにはバイト配列でシリアル化されたエクスチェンジが含まれます。
ただし、1 つの違いを覚えておく必要があります。テーブルによっては、id フィールドの内容が同じではありません。
集約テーブルの id は、コンポーネントがメッセージを集約するために使用する相関 ID を保持します。完成したテーブルの id には、対応する ブロブフィールドに格納されているエクスチェンジの ID が保持されます。

テーブルの作成に使用される SQL クエリーを次に示します。aggregation をアグリゲーターリポジトリー名に置き換えてください。

CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT
NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE
aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT
NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );

323.11.2. 本文とヘッダーをテキストとして保存する

Camel 2.11 から利用可能

JdbcAggregationRepository を設定して、メッセージ本文と select (ed) ヘッダーを文字列として別々の列に格納できます。たとえば、本文を保存するには、次の 2 つのヘッダー companyNameaccountName は次の SQL を使用します。

CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob
NOT NULL, body varchar(1000), companyName varchar(1000), accountName
varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE
TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange
blob NOT NULL, body varchar(1000), companyName varchar(1000),
accountName varchar(1000), constraint aggregationRepo3_completed_pk
PRIMARY KEY (id) );

 

次に、以下に示すように、この動作を有効にするようにリポジトリーを設定します。

<bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo3"/> <property
name="transactionManager" ref="txManager3"/> <property name="dataSource"
ref="dataSource3"/> <!-- configure to store the message body and
following headers as text in the repo --> <property
name="storeBodyAsText" value="true"/> <property
name="headersToStoreAsText"> <list> <value>companyName</value>
<value>accountName</value> </list> </property> </bean>

323.11.3. コーデック (シリアル化)

あらゆるタイプのペイロードを含めることができるため、エクスチェンジは設計上シリアライズできません。データベースのブロブフィールドに格納されるバイト配列に変換されます。これらの変換はすべて JdbcCodec クラスによって処理されます。コードの 1 つの詳細に注意する必要があります: ClassLoadingAwareObjectInputStream です。

ClassLoadingAwareObjectInputStream は、Apache ActiveMQ プロジェクトから再利用されています。ObjectInputStream をラップし、currentThread ではなく ContextClassLoader で使用します。利点は、他のバンドルによって公開されたクラスをロードできることです。これにより、交換の本文とヘッダーにカスタム型のオブジェクト参照を含めることができます。

323.11.4. Transaction

トランザクションを調整するには、Spring PlatformTransactionManager が必要です。

323.11.4.1. Service (Start/Stop)

start メソッドは、データベースの接続と必要なテーブルの存在を確認します。何か問題があると、起動時に失敗します。

323.11.5. アグリゲーターの設定

対象となる環境によっては、アグリゲーターに何らかの設定が必要になる場合があります。ご存知のように、各アグリゲーターには独自のリポジトリー (対応するテーブルのペアがデータベースに作成されている) とデータソースが必要です。デフォルトの lobHandler がデータベースシステムに適合していない場合は、lobHandler プロパティーを挿入できます。

Oracle の宣言は次のとおりです。

<bean id="lobHandler"
class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property
name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean
id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/> <property
name="repositoryName" value="aggregation"/> <property name="dataSource"
ref="dataSource"/> <!-- Only with Oracle, else use default --> <property
name="lobHandler" ref="lobHandler"/> </bean>

323.11.6. Optimistic locking

Camel 2.12 以降では、optimisticLocking をオンにして、複数の Camel アプリケーションが集約リポジトリー用の同じデータベースを共有するクラスター化された環境で、この JDBC ベースの集約リポジトリーを使用できます。競合状態がある場合、JDBC ドライバーは、JdbcAggregationRepository が対応できるベンダー固有の例外を出力します。JDBC ドライバーからの例外の原因が楽観的ロックエラーと見なされるかを知るには、これを行うマッパーが必要です。したがって、org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper があり、必要に応じてカスタムロジックを実装できます。次のように動作するデフォルトの実装 org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper があります。

次のチェックが行われます。

  • 原因となった例外が SQLException の場合、SQLState が 23 で始まるかどうかがチェックされます。
  • 原因となった例外が DataIntegrityViolationException の場合
  • 発生した例外クラス名に ConstraintViolation が含まれる場合。
  • クラス名が設定されている場合、FQN クラス名の一致をオプションでチェックします

さらに、FQN クラス名を追加できます。発生した例外 (またはネストされた例外) のいずれかが FQN クラス名のいずれかと等しい場合は、楽観的ロックエラーになります。

以下は、JDBC ベンダーからの 2 つの追加の FQN クラス名を定義する例です。

<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/> <property
name="repositoryName" value="aggregation"/> <property name="dataSource"
ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper"
ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra
FQN class names from our JDBC driver --> <bean id="myExceptionMapper"
class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
<property name="classNames"> <util:set>
<value>com.foo.sql.MyViolationExceptoion</value>
<value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set>
</property> </bean>