2.2. 複数の入力

概要

標準的なルートは、Java DSL の from(EndpointURL) 構文を使用して、1 つのエンドポイントから入力を受け取ります。しかし、ルートに複数の入力を定義する必要がある場合はどうすればよいでしょうか。Apache Camel では、ルートに複数の入力を指定する複数の方法があります。選択肢は、エクスチェンジを独立して処理するか、または異なる入力からのエクスチェンジを何らかの方法で組み合わせるか (この場合は、「Content Enricher パターン」 を使います) によって異なります。

複数の独立した入力

複数の入力を指定する最も簡単な方法は、from() DSL コマンドのマルチ引数形式を使用することです。以下に例を示します。

from("URI1", "URI2", "URI3").to("DestinationUri");

または、以下の同等の構文を使用できます。

from("URI1").from("URI2").from("URI3").to("DestinationUri");

これらの両方の例で、各入力エンドポイント URI1URI2 、および URI3 からのエクスチェンジは、相互に独立に、別個のスレッドで処理されます。実際、上記のルートは以下の 3 つに分かれたルートと同等であると考えることができます。

from("URI1").to("DestinationUri");
from("URI2").to("DestinationUri");
from("URI3").to("DestinationUri");

セグメント化されたルート

たとえば、2 つの異なるメッセージングシステムからの受信メッセージをマージし、同じルートを使用して処理する場合があります。ほとんどの場合、図2.5「セグメント化されたルートによる複数入力の処理」 に示されるように、ルートをセグメントに分割して複数の入力に対応できます。

図2.5 セグメント化されたルートによる複数入力の処理

セグメント化されたルートによる複数入力の処理

ルートの最初のセグメントは、たとえば activemq:Nyseactivemq:Nasdaq といったいくつかの外部キューから入力を取得し、その受信エクスチェンジを内部エンドポイント InternalUrl に送信します。2 つ目のルートセグメントは、受信エクスチェンジを内部エンドポイントから取得し、宛先キュー activemq:USTxn に送信することで、受信エクスチェンジをマージします。InternalUrl は、ルーターのアプリケーション でのみ使用することが意図されたエンドポイントの URL です。以下のタイプのエンドポイントが内部使用に適しています。

これらのエンドポイントの主な目的は、ルートの異なるセグメントをまとめることにあります。これらはすべて、複数の入力を単一のルートにマージする効果的な方法を提供します。

Direct エンドポイント

direct コンポーネントは、複数のルートを繋ぎ合わせる最も簡単なメカニズムを提供します。direct コンポーネントのイベントモデルは 同期型 であり、ルートの後続のセグメントは最初のセグメントと同じスレッドで実行されます。direct URL の一般的な形式は direct:EndpointID です。エンドポイント ID である EndpointID は、エンドポイントのインスタンスを識別する一意の英数字の文字列です。

たとえば、2 つのメッセージキュー activemq:Nyseactivemq:Nasdaq から入力を受け取り、それらを単一のメッセージキュー activemq:USTxn にマージする場合、以下のルートセットを定義することで実行できます。

from("activemq:Nyse").to("direct:mergeTxns");
from("activemq:Nasdaq").to("direct:mergeTxns");

from("direct:mergeTxns").to("activemq:USTxn");

最初の 2 つのルートはメッセージキュー NyseNasdaq から入力を受け取り、それらをエンドポイント direct:mergeTxns に送信します。最後のキューは、前の 2 つのキューからの入力を組み合わせ、組み合わせたメッセージストリームを activemq:USTxn キューに送信します。

direct エンドポイントの実装は、以下のように動作します。エクスチェンジがプロデューサーエンドポイント (例: to("direct:mergeTxns")) に到達するたびに、direct エンドポイントは、同じエンドポイント ID (例: from("direct:mergeTxns")) を持つすべてのコンシューマーエンドポイントに直接エクスチェンジを渡します。direct エンドポイントは、同じ Java 仮想マシン (JVM) インスタンス内の同じ CamelContext に属するルート間の通信にのみ使用できます。

SEDA エンドポイント

SEDA コンポーネントは、複数のルートを繋ぎ合わせるもう 1 つのメカニズムを提供します。これは direct コンポーネントと同様の使い方ができますが、以下のように基盤となるイベントとスレッドのモデルが異なります。

  • SEDA エンドポイントの処理は同期されません。つまり、エクスチェンジを SEDA プロデューサーエンドポイントに送信すると、ルート内の前のプロセッサーに制御がすぐに戻されます。
  • SEDA エンドポイントはキューバッファー (java.util.concurrent.BlockingQueue 型) を持ち、次のルートセグメントによって処理される前の受信エクスチェンジをすべて格納しています。
  • 各 SEDA コンシューマーエンドポイントは、ブロッキングキューからのエクスチェンジオブジェクトを処理するためにスレッドプール (デフォルトのサイズは 5) を作成します。
  • SEDA コンポーネントは、競合コンシューマー (competing consumers) パターンをサポートします。これは、特定のエンドポイントに複数のコンシューマーが接続している場合でも、各受信エクスチェンジが 1 度だけ処理されることを保証するものです。

SEDA エンドポイントを使用する主な利点の 1 つは、組み込みのコンシューマースレッドプールにより、ルートの応答性が向上することです。株式取引の例は、以下のように、direct エンドポイントの代わりに SEDA エンドポイントを使用するように書き換えられます。

from("activemq:Nyse").to("seda:mergeTxns");
from("activemq:Nasdaq").to("seda:mergeTxns");

from("seda:mergeTxns").to("activemq:USTxn");

この例と direct の例の主な相違点は、SEDA を使用する場合、2 番目のルートセグメント (seda:mergeTxns から activemq:USTxn) が 5 つのスレッドのプールで処理される点です。

注記

SEDA は単にルートセグメントを繋ぎ合わせるだけではありません。段階的イベント駆動型アーキテクチャー (staged event-driven architecture、SEDA) は、より管理しやすいマルチスレッドアプリケーションを構築するための設計哲学を含んでいます。Apache Camel の SEDA コンポーネントの目的は、この設計哲学をアプリケーションに適用できるようにすることです。SEDA の詳細は、http://www.eecs.harvard.edu/~mdw/proj/seda/ を参照してください。

VM エンドポイント

VM コンポーネントは SEDA エンドポイントと非常に似ています。唯一の違いは、SEDA コンポーネントが同じ CamelContext 内のルートセグメントの繋ぎ合わせに限定されるのに対し、VM コンポーネントでは、同じ Java 仮想マシン内で実行されている限り、異なる Apache Camel アプリケーションからのルートを繋ぎ合わせられることです。

株式取引の例は、以下のように、 SEDA エンドポイントの代わりに VM エンドポイントを使用するように書き換えられます。

from("activemq:Nyse").to("vm:mergeTxns");
from("activemq:Nasdaq").to("vm:mergeTxns");

そして、別のルーターアプリケーション (同じ Java 仮想マシンで実行されている) において、以下のようにルートの 2 つ目のセグメントを定義できます。

from("vm:mergeTxns").to("activemq:USTxn");

Content Enricher パターン

Content Enricher パターンは、これまでと根本的に異なる方法でルートへの複数入力の処理を定義します。エクスチェンジが Enricher プロセッサーに入ると、Enricher は外部リソースにアクセスして情報を取得し、その情報を元のメッセージに追加します。このパターンでは、外部リソースが実質的にメッセージへの 2 つ目の入力を表しています。

たとえば、信用リクエストを処理するアプリケーションを作成している場合に、信用リクエストを処理する前に、それを顧客に対して信用格付けを割り当てるデータ (格付けデータはディレクトリー src/data/ratings のファイルに格納されている) で拡張する必要があります。以下のように、pollEnrich() パターンと GroupedExchangeAggregationStrategy 集約ストラテジーを使用して、受信信用リクエストと格付けファイルのデータを組み合わせることができます。

from("jms:queue:creditRequests")
    .pollEnrich("file:src/data/ratings?noop=true", new GroupedExchangeAggregationStrategy())
    .bean(new MergeCreditRequestAndRatings(), "merge")
    .to("jms:queue:reformattedRequests");

GroupedExchangeAggregationStrategy クラスは、org.apache.camel.processor.aggregate パッケージの標準集約ストラテジーで、各新しいエクスチェンジを java.util.List インスタンスに追加し、生成されるリストを Exchange.GROUPED_EXCHANGE エクスチェンジプロパティーに保存します。この場合、リストには、(creditRequests JMS キューからの) 元のエクスチェンジと (file エンドポイントからの) Enricher エクスチェンジの 2 つの要素が含まれます。

グループ化されたエクスチェンジにアクセスするには、以下のようなコードを使用します。

public class MergeCreditRequestAndRatings {
    public void merge(Exchange ex) {
        // Obtain the grouped exchange
        List<Exchange> list = ex.getProperty(Exchange.GROUPED_EXCHANGE, List.class);

        // Get the exchanges from the grouped exchange
        Exchange originalEx = list.get(0);
        Exchange ratingsEx  = list.get(1);

        // Merge the exchanges
        ...
    }
}

このアプリケーションへの別のアプローチとしては、データをマージするコードをカスタム集約ストラテジークラスに直接実装することが考えられます。

Content Enricher パターンの詳細は、「Content Enricher」 を参照してください。