8.3. 受信者リスト

概要

図8.3「Recipient List パターン」 に示されている Recipient List は、各受信メッセージを複数の異なる宛先に送信するルーターの一種です。また、Recipient List は通常、実行時に受信者リスト (Recipient List) を演算する必要があります。

図8.3 Recipient List パターン

Recipient List パターン

宛先が固定された Recipient List

最もシンプルな Recipient List は、宛先リストが固定され、事前に認識され、交換パターンは InOnly であるものです。この場合、宛先リストを to() Java DSL コマンドへハードワイヤーすることができます。

注記

ここで示す例は、宛先が固定された Recepient List のため、InOnly 交換パターン (Pipes and Filters パターン と似ています) で のみ 動作します。Out メッセージを使用した交換パターンの Recipient List を作成する場合は、代わりに Multicast パターン を使用してください。

Java DSL の例

以下の例は、コンシューマーエンドポイント queue:a から、InOnly エクスチェンジを固定された宛先リストにルーティングする方法を示しています。

from("seda:a").to("seda:b", "seda:c", "seda:d");

XML 設定の例

以下の例は、XML で同じルートを設定する方法を示しています。

<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <to uri="seda:b"/>
    <to uri="seda:c"/>
    <to uri="seda:d"/>
  </route>
</camelContext>

実行時に演算された Recipient List

Recipient List パターンを使用するほとんどの場合で、宛先リストを実行時に演算する必要があります。これを行うには、recipientList() プロセッサーを使用します。このプロセッサーは、唯一の引数として宛先リストを取ります。Apache Camel はリスト引数に型コンバーターを適用するため、ほとんどの標準的な Java リスト型 (例: コレクション、リスト、配列など) を使用できます。型コンバーターの詳細については 「組み込み型コンバーター」 を参照してください。

受信者は 同じ エクスチェンジインスタンスのコピーを受け取り、Apache Camel はそれらを順次実行します。

Java DSL の例

以下の例は、recipientListHeader という名前のメッセージヘッダーから宛先リストを抽出する方法を示しています。ヘッダーの値は、コンマ区切りのエンドポイント URI のリストになります。

from("direct:a").recipientList(header("recipientListHeader").tokenize(","));

header の値がリスト型である場合、値を直接 recipientList() の引数に使うことができます。以下に例を示します。

from("seda:a").recipientList(header("recipientListHeader"));

ただし、この例では、基礎となるコンポーネントがこの特定のヘッダーをどのように解析するかに完全に依存しています。コンポーネントがヘッダーを単純な文字列として解析する場合、この例は動作し ません。ヘッダーは、何らかの型の Java リストに変換する必要があります。

XML 設定の例

以下の例では、XML で前述のルートを定義する方法を示しています。ヘッダー値は、コンマ区切りのエンドポイント URI リストです。

<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <recipientList delimiter=",">
      <header>recipientListHeader</header>
    </recipientList>
  </route>
</camelContext>

複数の受信者に並列で送信

Camel 2.2 で利用可能

Recipient List パターン は、parallelProcessing をサポートしています。これは、Splitter パターン の機能に似ています。並列処理機能を使用して、複数の受信者に並列でエクスチェンジを送信します。以下に例を示します:

from("direct:a").recipientList(header("myHeader")).parallelProcessing();

Spring XML では、並列処理機能は recipientList タグの属性として実装されています。以下に例を示します。

<route>
  <from uri="direct:a"/>
  <recipientList parallelProcessing="true">
    <header>myHeader</header>
  </recipientList>
</route>

例外時に停止

Camel 2.2 で利用可能

Recipient List は、stopOnException 機能をサポートします。これを使用すると、受信者が失敗した場合にそれ以降の受信者への送信を停止することができます。

from("direct:a").recipientList(header("myHeader")).stopOnException();

Spring XML では、Recipient List タグの属性です。

Spring XML では、例外時の停止機能は recipientList タグの属性として実装されます。以下に例を示します。

<route>
  <from uri="direct:a"/>
  <recipientList stopOnException="true">
    <header>myHeader</header>
  </recipientList>
</route>
注記

同じルートで parallelProcessingstopOnException を組み合わせることができます。

無効なエンドポイントの無視

Camel 2.3 の時点で利用可能

Recipient List パターンignoreInvalidEndpoints オプションをサポートします。これにより、Recipient List が無効なエンドポイントをスキップできます (Routing Slips パターン も、このオプションをサポートしています)。以下に例を示します。

from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

Spring XML では、以下のように recipientList タグに ignoreInvalidEndpoints 属性を設定することで、このオプションを有効にすることができます。

<route>
  <from uri="direct:a"/>
  <recipientList ignoreInvalidEndpoints="true">
    <header>myHeader</header>
  </recipientList>
</route>

myHeaderdirect:foo,xxx:bar の 2 つのエンドポイントが含まれるケースについて考えてみましょう。最初のエンドポイントは有効であり、動作します。2 つ目は無効であるため、無視されます。無効なエンドポイントに遭遇するたびに、Apache Camel ログが INFO レベルで記録されます。

カスタム AggregationStrategy の使用

Camel 2.2 で利用可能

Recipient List パターン でカスタム AggregationStrategy を使用できます。これは、リスト内の受信者からのリプライを集計する場合に便利です。Apache Camel はデフォルトで UseLatestAggregationStrategy 集約ストラテジーを使用して、最後に受信したリプライのみを保持します。より高度な集約ストラテジーについては、AggregationStrategy インターフェイスを独自に実装し定義できます。詳細については 「Aggregator」 を参照してください。たとえば、集約ストラテジー MyOwnAggregationStrategy をリプライメッセージに適用するには、以下のように Java DSL ルートを定義します。

from("direct:a")
    .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
    .to("direct:b");

Spring XML では、以下のようにカスタムの集約ストラテジーを recipientList タグの属性として指定できます。

<route>
  <from uri="direct:a"/>
  <recipientList strategyRef="myStrategy">
    <header>myHeader</header>
  </recipientList>
  <to uri="direct:b"/>
</route>

<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

カスタムスレッドプールの使用

Camel 2.2 で利用可能

これは、parallelProcessing を使用する場合にのみ必要です。デフォルトでは、Camel は 10 個のスレッドを持つスレッドプールを使用します。今後スレッドプールの管理と設定内容をメンテナンスする際に、変更される可能性がありますのでご注意ください (予定どおりであれば Camel 2.2) 。

カスタム集約ストラテジーを使用するのと同じように設定します。

メソッド呼び出しの Recipient List としての使用

受信者を生成するために、Bean を使用できます。以下に例を示します。

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");

MessageRouter Bean は以下のように定義されます。

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}

Recipient List としての Bean

Bean を Recipient List として動作させるには、@RecipientList アノテーションを Recipient List を返すメソッドに付与します。以下に例を示します。

public class MessageRouter {

    @RecipientList
    public String routeTo() {
        String queueList = "activemq:queue:test1,activemq:queue:test2";
        return queueList;
    }
}

この場合、ルートに recipientList DSL コマンドを含め ない でください。以下のようにルートを定義します。

from("activemq:queue:test").bean(MessageRouter.class, "routeTo");

タイムアウトの使用

Camel 2.5 で利用可能

parallelProcessing を使用する場合は、合計 timeout 値をミリ秒単位で設定できます。Camel はタイムアウトに達するまでメッセージを並行して処理します。これにより、1 つのメッセージが遅い場合でも処理を継続できます。

以下の例では、recipientlist ヘッダーの値が direct:a,direct:b,direct:c であるため、メッセージは 3 人の受信者に送信されます。250 ミリ秒のタイムアウトがあるので、最後の 2 つのメッセージだけが、タイムフレーム内で完了することができます。そのため、集約すると BC という文字列の結果が得られます。

from("direct:start")
    .recipientList(header("recipients"), ",")
    .aggregationStrategy(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;
                }

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
            }
        })
        .parallelProcessing().timeout(250)
    // use end to indicate end of recipientList clause
    .end()
    .to("mock:result");

from("direct:a").delay(500).to("mock:A").setBody(constant("A"));

from("direct:b").to("mock:B").setBody(constant("B"));

from("direct:c").to("mock:C").setBody(constant("C"));
注記

この timeout 機能は splitter のほか、multicast および recipientList の両方でもサポートされています。

デフォルトでは、タイムアウトが発生した場合、AggregationStrategy は呼び出されません。ただし、特殊なバージョンを実装することができます。

// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

    /**
     * A timeout occurred
     *
     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
     */
    void timeout(Exchange oldExchange, int index, int total, long timeout);

これにより、必要であれば、AggregationStrategy でタイムアウトに対応することができます。

timeout は合計値

タイムアウトは合計です。つまり、X 時間後に Camel は期限内に完了したメッセージのみを集約します。残りの分はキャンセルされます。また Camel は、タイムアウトの原因となった最初のインデックスに対して、TimeoutAwareAggregationStrategytimeout メソッドを 1 度だけ呼び出します。

送信メッセージへのカスタム処理の適用

recipientList がメッセージを Recipient List のいずれかに送信する前に、元のメッセージのシャローコピーであるメッセージレプリカを作成します。シャローコピーでは、元のメッセージのヘッダーおよびペイロードは参照によってのみコピーされます。各新規コピーには、それらの要素独自のインスタンスが含まれていません。その結果、メッセージのシャローコピーがリンクされ、異なるエンドポイントにルーティングする際にカスタム処理を適用することはできません。

レプリカがエンドポイントに送信される前に、各メッセージレプリカに対して何らかのカスタム処理を行う場合は、recipientList 句で onPrepare DSL コマンドを呼び出すことができます。この onPrepare コマンドは、メッセージがシャローコピーされた 直後、かつメッセージがエンドポイントにディスパッチされる 直前 に、カスタムプロセッサーを挿入します。たとえば、以下のルートでは、CustomProc プロセッサーが 各 recipient エンドポイント 用のメッセージレプリカで呼び出されます。

from("direct:start")
  .recipientList().onPrepare(new CustomProc());

onPrepare DSL コマンドの一般的なユースケースとして、メッセージの一部またはすべての要素のディープコピーを実行します。これにより、各メッセージのレプリカは他のレプリカとは独立して変更することができます。たとえば、以下の CustomProc プロセッサークラスは、メッセージボディーのディープコピーを実行します。メッセージボディーの型は BodyType であると想定され、ディープコピーはメソッド BodyType.deepCopy() によって実行されます。

// Java
import org.apache.camel.*;
...
public class CustomProc implements Processor {

    public void process(Exchange exchange) throws Exception {
        BodyType body = exchange.getIn().getBody(BodyType.class);

        // Make a _deep_ copy of of the body object
        BodyType clone =  BodyType.deepCopy();
        exchange.getIn().setBody(clone);

        // Headers and attachments have already been
        // shallow-copied. If you need deep copies,
        // add some more code here.
    }
}

オプション

recipientList DSL コマンドは以下のオプションをサポートします。

名前

デフォルト値

説明

delimiter

,

式が複数のエンドポイントを返した場合に使用される区切り文字。

strategyRef

 

AggregationStrategy を参照し、各受信者からのリプライを集約して 「受信者リスト」 からの唯一となる送信メッセージを生成します。デフォルトでは、Camel は最後のリプライを送信メッセージとして使用します。

strategyMethodName

 

POJO を AggregationStrategy として使用している場合に、使用するメソッド名を明示的に指定するために使用できます。

strategyMethodAllowNull

false

POJO を AggregationStrategy として使用している場合に、このオプションを使用することができます。false に設定すると、エンリッチするデータがない場合に aggregate メソッドは使用されません。true に設定すると、エンリッチするデータがない場合には、oldExchangenull 値が使用されます。

parallelProcessing

false

Camel 2.2: 有効にすると、受信者へのメッセージ送信が並列処理されます。呼び出し元スレッドは、すべてのメッセージが完全に処理されるまで待機してから続行することに注意してください。受信者への送信と、受信者からのリプライ処理のみが並列で処理されます。

parallelAggregate

false

有効にすると、AggregationStrategyaggregate メソッドを同時に呼び出すことができます。これには、AggregationStrategy の実装がスレッドセーフである必要があることに注意してください。デフォルトでは、このオプションは false となっており、Camel が自動的に aggregate メソッドへの呼び出しを同期することを意味します。ただし、場合によっては、AggregationStrategy をスレッドセーフとして実装し、このオプションを true に設定することで、パフォーマンスを向上させることができます。

executorServiceRef

 

Camel 2.2: 並列処理に使用するカスタムスレッドプールを参照します。このオプションを設定すると、並列処理は自動的に適用されるため、 並列処理用オプションも有効にする必要はありません。

stopOnException

false

Camel 2.2: 例外発生時、すぐに継続処理を停止するかどうか。無効にすると、いずれかの失敗の有無に関わらず、Camel はメッセージをすべての受信者に送信します。AggregationStragegy クラス内で、例外処理を完全に制御することができます。

ignoreInvalidEndpoints

false

Camel 2.3: エンドポイント URI が解決できない場合、無視されます。false の場合は、Camel はエンドポイント URI が有効ではないことを示す例外を出力します。

streaming

false

Camel 2.5: 有効な場合、Camel は返信を順不同で処理します (例: 戻って来た順) 。無効な場合、Camel は指定された式と同じ順序で返信を処理します。

timeout

 

camel 2.5: 合計タイムアウト値をミリ秒単位で設定します。「受信者リスト」 が指定された時間枠内のすべての応答を送信および処理できない場合、タイムアウトが発生し、「受信者リスト」 から抜け出し続行します。AggregationStrategy を提供した場合、timeout メソッドが抜け出す前に呼び出されるため、注意してください。

onPrepareRef

 

camel 2.8: カスタムプロセッサーを参照して、各受信者が受け取るエクスチェンジのコピーを準備します。これにより、必要に応じてメッセージのペイロードをディープクローンするなど、カスタムロジックを実行できます。

shareUnitOfWork

false

Camel 2.8: Unit of Work を共有すべきかどうか。詳細は、「Splitter」 で同じオプションを参照してください。

cacheSize

0

Camel 2.13.1/2.12.4: Routing Slip で再使用されるプロデューサーをキャッシュする ProducerCache のキャッシュサイズを設定できます。デフォルトのキャッシュサイズ 0 を使用します。値を -1 に設定すると、キャッシュをすべて無効にすることができます。

Recipient List で交換パターンを使用

デフォルトでは、Recipient List は既存の交換パターンを使用します。稀ではありますが、別の交換パターンを使用して受信者にメッセージを送信するケースがある可能性があります。

たとえば、InOnly ルートとして開始するルートがあるとします。Recipient List で InOut 交換パターンを使用する場合、受信者用エンドポイントで直接交換パターンを設定する必要があります。

以下の例は、新規ファイルが InOnly として開始され、Recipient List へルーティングされるルートを示しています。ActiveMQ(JMS) エンドポイントで InOut を使用する場合、exchangePattern=InOut オプションを指定する必要があります。ただし、JMS リクエストやリプライをルーティングし続けるため、レスポンスは outbox ディレクトリー内にファイルとして保存されます。

from("file:inbox")
  // the exchange pattern is InOnly initially when using a file route
  .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut")
  .to("file:outbox");
注記

InOut 交換パターンは、タイムアウト時にレスポンスを受け取る必要があります。ただし、レスポンスを受信できない場合は失敗します。