11.4. Competing Consumers

概要

図11.3「Competing Consumers パターン」 に示される Competing Consumers パターンは、複数のコンシューマーが同じキューからメッセージを引き出すことを可能にしつつ、各メッセージはたった一度だけ消費される ことを保証します。このパターンを使用すると、逐次的なメッセージ処理を並行的なメッセージ処理に置き換えることができます (結果として応答レイテンシーの減少をもたらします) 。

図11.3 Competing Consumers パターン

Competing Consumers パターン

以下のコンポーネントは、Competing Consumers パターンの例になります。

JMS ベースの競合コンシューマー

通常の JMS キューは、各メッセージがたった一度だけ消費されることを暗黙的に保証しています。そのため、JMS キューは自動的に Competing Consumers パターンをサポートします。たとえば、以下のように、JMS キュー HighVolumeQ からメッセージを引き出す 3 つの Competing Consumers を定義できます。

from("jms:HighVolumeQ").to("cxf:bean:replica01");
from("jms:HighVolumeQ").to("cxf:bean:replica02");
from("jms:HighVolumeQ").to("cxf:bean:replica03");

ここでは、CXF (Web サービス) エンドポイント replica01replica02、および replica03 は、HighVolumeQ キューからのメッセージを並行して処理します。

もう 1 つの方法として、JMS クエリーオプション concurrentConsumers を設定して、Competing Consumers のスレッドプールを作成することもできます。たとえば、以下のルートは、指定されたキューからメッセージを取得する 3 つの競合スレッドのプールを作成します。

from("jms:HighVolumeQ?concurrentConsumers=3").to("cxf:bean:replica01");

concurrentConsumers オプションは、以下のように XML DSL においても指定することができます。

 <route>
   <from uri="jms:HighVolumeQ?concurrentConsumers=3"/>
   <to uri="cxf:bean:replica01"/>
 </route>
注記

JMS トピックは Competing Consumers パターンをサポート しません。定義上、JMS トピックは、同じメッセージの複数のコピーを異なるコンシューマーに送信することを目的としています。したがって、Competing Consumers パターンとは互換性がありません。

SEDA ベースの競合コンシューマー

SEDA コンポーネントの目的は、計算を複数のステージに分割することで並行処理を単純化することです。SEDA エンドポイントは本質的に、インメモリーのブロッキングキュー (java.util.concurrent.BlockingQueue で実装された) をカプセル化します。そのため、SEDA エンドポイントを使用してルートを複数のステージに分割し、各ステージでは複数のスレッドを使用することもできます。たとえば、以下のように 2 つのステージで設定される SEDA ルートを定義できます。

// Stage 1: Read messages from file system.
from("file://var/messages").to("seda:fanout");

// Stage 2: Perform concurrent processing (3 threads).
from("seda:fanout").to("cxf:bean:replica01");
from("seda:fanout").to("cxf:bean:replica02");
from("seda:fanout").to("cxf:bean:replica03");

最初のステージには、ファイルエンドポイント file://var/messages からのメッセージを消費し、それらを SEDA エンドポイント seda:fanout にルーティングする単一のスレッドが含まれています。2 番目のステージには、3 つのスレッドが含まれます。エクスチェンジを cxf:bean:replica01 にルーティングするスレッド、エクスチェンジを cxf:bean:replica02 にルーティングするスレッド、そしてエクスチェンジを cxf:bean:replica03 にルーティングするスレッドです。これら 3 つのスレッドは、ブロッキングキューを使用して実装された SEDA エンドポイントから、エクスチェンジインスタンスを取得するために競合します。ブロッキングキューはロックを使用して一度に複数のスレッドがキューにアクセスするのを防ぐため、エクスチェンジインスタンスは一度だけ消費されることが保証されます。

SEDA エンドポイントと thread() によって作成されたスレッドプールとの違いについては、Apache Camel コンポーネントリファレンスガイドSEDA コンポーネント を参照してください。