8.15. Scatter-Gather

Scatter-Gather

図8.11「Scatter-Gather パターン」 に記載されているように、Scatter-Gather パターン を使用すると、メッセージを動的に指定された複数の受信者にルーティングし、そのレスポンスを単一のメッセージに再集約できます。

図8.11 Scatter-Gather パターン

ブロードキャストアグリゲート

動的なスキャッター/ギャザーの例

以下の例は、複数の異なるベンダーから最も良いビールの見積もりを取得するアプリケーションの概要を説明しています。この例では、動的な 「受信者リスト」 を使用してすべてのベンダーに見積もりを要求し、「Aggregator」 を使用してすべてのレスポンスの中から最良の見積もりを選別します。このアプリケーションのルートは、以下のように定義されます。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <recipientList>
      <header>listOfVendors</header>
    </recipientList>
  </route>
  <route>
    <from uri="seda:quoteAggregator"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
      <correlationExpression>
        <header>quoteRequestId</header>
      </correlationExpression>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>

最初のルートでは、「受信者リスト」listOfVendors ヘッダーを確認して受信者リストを取得します。したがって、このアプリケーションにメッセージを送信するクライアントは listOfVendors ヘッダーをメッセージに追加する必要があります。例8.1「メッセージングクライアントの例」 は、該当のヘッダーデータを送信メッセージに追加するメッセージングクライアントのコードの一例を示しています。

例8.1 メッセージングクライアントの例

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3");
headers.put("quoteRequestId", "quoteRequest-1");
template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);

メッセージは bean:vendor1bean:vendor2、および bean:vendor3 のエンドポイントに分散されます。これらの Bean はすべて以下のクラスによって実装されます。

public class MyVendor {
    private int beerPrice;

    @Produce(uri = "seda:quoteAggregator")
    private ProducerTemplate quoteAggregator;

    public MyVendor(int beerPrice) {
        this.beerPrice = beerPrice;
    }

    public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception {
        if ("beer".equals(item)) {
            exchange.getIn().setBody(beerPrice);
            quoteAggregator.send(exchange);
        } else {
            throw new Exception("No quote available for " + item);
        }
    }
}

Bean インスタンス、vendor1vendor2、および vendor3 は、以下のように Spring XML 構文を使用してインスタンス化されます。

<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/>

<bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>1</value>
  </constructor-arg>
</bean>

<bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>2</value>
  </constructor-arg>
</bean>

<bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>3</value>
  </constructor-arg>
</bean>

各 Bean は、それぞれ異なるビール価格で初期化されます (コンストラクター引数に渡されます)。メッセージが各 Bean エンドポイントに送信されると、MyVendor.getQuote メソッドに到達します。このメソッドは、この見積もり要求がビールに対してであるかどうかを確認する簡単なチェックを実行し、それから後のステップで取得できるようにエクスチェンジにビールの価格を設定します。メッセージは POJO 生成 を使用して次のステップに転送されます (@Produce アノテーションを参照)。

次のステップでは、すべてのベンダーからのビールの見積もりを受け取り、どのベンダーの見積もりが最良か (つまり最も低いか) を調べます。そのため、「Aggregator」 をカスタムの集約ストラテジーと共に使用します。「Aggregator」 は、どのメッセージが現在の見積もりに関連するものかを識別する必要があります。これは、quoteRequestId ヘッダー (correlationExpression に渡された) の値に基づいてメッセージを関連付けることによって行われます。例8.1「メッセージングクライアントの例」 にあるように、相関 ID は quoteRequest-1 に設定されています (相関 ID は一意である必要があります)。見積もりの集合の中から最も低いものを選別するには、以下のようなカスタム集約ストラテジーを使用します。

public class LowestQuoteAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;
        }

        if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
            return oldExchange;
        } else {
            return newExchange;
        }
    }
}

静的なスキャッター/ギャザーの例

静的な 「受信者リスト」 を使用することで、スキャッター/ギャザーアプリケーションの中で受信者を明示的に指定することができます。以下の例は、静的なスキャッター/ギャザーのシナリオを実装するために使用するルートを示しています。

from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3");

from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator");
from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator");
from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator");

from("seda:quoteAggregator")
    .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")