Red Hat Training
A Red Hat training course is available for Red Hat Fuse
第151章 SJMS Batch
SJMS バッチコンポーネント
Camel 2.16 以降で利用可能
SJMS Batch は、JMS キューからの非常に高性能なトランザクションバッチ消費のための特殊なコンポーネントです。これは、コンシューマーのみのコンポーネントとアグリゲーターのハイブリッドと考えることができます。
Camel の一般的なユースケースは、集約された状態を別のエンドポイントに送信する前にキューからメッセージを消費し、それらを集約することです。処理を実行しているシステムが失敗した場合にデータが失われるのを防ぐために、通常はキューからのトランザクション内で消費され、JDBC コンポーネントにあるものなど、永続
AggregationRepository
に保存されたデータを集約します。
Aggregator パターンの動作では、受信メッセージが集約される前に
AggregationRepository
からデータを取得し、後で結果を書き込みます。性質的に、集計されたアーティファクトの数が増えると、読み取りおよび書き込みは徐々に長くなります。これの影響を示す任意の時間単位を使用したこの例は次のとおりです。
項目 | 読み取り時間 | 書き込み時間 | 合計時間 |
---|---|---|---|
0 | 0 | 1 | 1 |
1 | 1 | 2 | 4 |
2 | 2 | 3 | 9 |
3 | 3 | 4 | 16 |
4 | 4 | 5 | 25 |
5 | 5 | 6 | 36 |
6 | 6 | 7 | 49 |
7 | 7 | 8 | 64 |
8 | 8 | 9 | 81 |
9 | 9 | 10 | 100 |
一方、SJMS Batch コンポーネントを使用した消費パフォーマンスは linear です。各メッセージは消費され、次のメッセージがフェッチされる前に
AggregationStrategy
を使用して集約されます。すべての消費と集約が単一の JMS トランザクションで実行されるため、中間状態を永続化するために外部ストレージは必要ありません。これにより、上記の読み取りおよび書き込みコストを回避します。実際には、これにより複数の順番でより高いスループットが得られます。
最初のメッセージ以降のサイズまたは期間のいずれかで完了条件が満たされると、集約された
Exchange
がルートに渡されます。この Exchange
の処理中に例外が出力された場合、またはシステムがシャットダウンすると、元のメッセージがすべてキューに戻されます(または、ブローカーの設定に応じてデッドレターキューに配置されます)。
通常のアグリゲーターとは異なり、集約条件には機能がありません。つまり、複数のメッセージのグループに対して同時に消費をバッチ処理することはできません。消費されたメッセージはすべて 1 つのバッチに集約されます。これを回避するには、一般的な設計アプローチとして、最初にメッセージの異なるグループを異なるキューに集約し、バッチを 1 つずつ個別にバッチ処理する方法が挙げられます。
複数の JMS コンシューマーサポートを利用できます。これにより、1 つのルートを使用して並行して消費でき、同時に JMS メッセージグループなどの機能を使用して関連メッセージをグループ化できます。
Maven ユーザーは、このコンポーネントの以下の依存関係を
pom.xml
に追加する必要があります。
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-sjms</artifactId> <version>2.17.0.redhat-630xxx</version> <!-- use the same version as your Camel core version --> </dependency>
SJMS Batch は SJMS のサブコンポーネントで、同じライブラリーにあります。
URI 形式
sjms:[queue:]destinationName[?options]
destinationName
は JMS キューです。デフォルトでは、destinationName
はキュー名として解釈されます。
sjms:FOO.BAR
必要に応じて、
queue:
接頭辞を含めることができます。
sjms:queue:FOO.BAR
このコンテキスト内でバッチ消費を使用する利点がないため、トピックの使用はサポートされていません。トピックメッセージは通常永続的ではなく、損失は許容されます。障害が発生したトランザクション内で消費されると、トピックメッセージはブローカーによって再配信されない可能性があります。このシナリオでは、プレーン SJMS コンシューマーエンドポイントは、通常の非永続性でサポートされるアグリゲーターと組み合わせて使用できます。
コンポーネントのオプションおよび設定
SJMS Batch コンポーネントは以下の設定オプションをサポートします。
オプション
|
必須
|
デフォルト値
|
説明
|
---|---|---|---|
aggregationStrategy
|
Yes
|
null
|
Camel レジストリーでの
AggregationStrategy への参照(例: #myAggregationStrategy )
|
completionSize
|
|
200
|
集約するバッチのサイズ。
これが JMS コンシューマーの事前フェッチバッファー、またはブローカーのキューの最大ページサイズよりも大きくならないように注意する必要があります。タイムアウトを使用しない場合は、コンシューマーがハングする可能性があります。
0 または less の値は、
completionTimeout のみを使用する必要があることを示します。
|
completionTimeout
|
|
500
|
エクスチェンジを生成する前に最初のメッセージの受信から待機する最大時間。
0 または less の値は、
completionSize のみを使用する必要があることを示します。
完了タイムアウトと完了間隔を同時に使用することはできません。設定できるのは 1 つのみです。
|
completionTimeout
|
Camel 2.17: 完了間隔(ミリ秒単位)。これにより、バッチが間隔ごとにスケジュールされる固定レートで完了されます。タイムアウトがトリガーされ、バッチにメッセージがない場合、バッチは空になる可能性があります。完了タイムアウトと完了間隔を同時に使用することはできません。設定できるのは 1 つのみです。 | ||
sendEmptyMessageWhenIdle
|
false
|
Camel 2.17: 完了タイムアウトまたは間隔を使用する場合、タイムアウトがトリガーされ、バッチにメッセージがない場合にバッチが空になることがあります。このオプションが true であり、バッチが空の場合、空のメッセージがバッチに追加され、空のメッセージがルーティングされます。 | |
pollDuration
|
|
1000
|
MessageConsumer.receive() への呼び出しの最大長。タイムアウトの残りはバッチ内で優先されます。
この値は、実質的にバッチ間のポーリング時間です。
|
timeoutCheckerExecutorService
|
Camel 2.17: completionInterval オプションを使用すると、バックグラウンドスレッドが作成され、完了間隔がトリガーされます。すべてのコンシューマーに新しいスレッドを作成するのではなく、カスタムスレッドプールを提供するには、このオプションを設定します。 |
completionSize
endpoint 属性は completionTimeout
とともに使用され、最初に満たされる条件により、集約された Exchange
がルートに出力されます。