8.15. scatter-Gather
scatter-Gather
scatter-gather 模式 (如 图 8.11 “scatter-Gather Pattern” 所示)可让您将消息路由到多个动态指定接收方,并将响应重新合并到单一消息中。
图 8.11. scatter-Gather Pattern

dynamic scatter-gather 示例
以下示例概述了在多个不同供应商中获得最佳引用的应用程序。这个示例使用动态 第 8.3 节 “接收者列表” 来请求来自所有供应商和 第 8.5 节 “聚合器” 的报价,以选择所有响应中的最佳引用。此应用程序的路由定义如下:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<recipientList>
<header>listOfVendors</header>
</recipientList>
</route>
<route>
<from uri="seda:quoteAggregator"/>
<aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
<correlationExpression>
<header>quoteRequestId</header>
</correlationExpression>
<to uri="mock:result"/>
</aggregate>
</route>
</camelContext>
在第一个路由中,第 8.3 节 “接收者列表” 查看 listOfVendors 标头来获取接收者列表。因此,向此应用发送消息的客户端需要将 listOfVendors 标头添加到消息。例 8.1 “消息传递客户端示例” 显示来自消息传递客户端的一些示例代码,该代码将相关的标头数据添加到传出消息。
例 8.1. 消息传递客户端示例
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3");
headers.put("quoteRequestId", "quoteRequest-1");
template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);
这个消息会分发到以下端点: bean:vendor1、rean:vendor2 和 bean:vendor3。这些 Bean 全部由以下类实现:
public class MyVendor {
private int beerPrice;
@Produce(uri = "seda:quoteAggregator")
private ProducerTemplate quoteAggregator;
public MyVendor(int beerPrice) {
this.beerPrice = beerPrice;
}
public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception {
if ("beer".equals(item)) {
exchange.getIn().setBody(beerPrice);
quoteAggregator.send(exchange);
} else {
throw new Exception("No quote available for " + item);
}
}
}
bean 实例、vendor1、 vendor2 和 vendor3 使用 Spring XML 语法实例化,如下所示:
<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/>
<bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor">
<constructor-arg>
<value>1</value>
</constructor-arg>
</bean>
<bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor">
<constructor-arg>
<value>2</value>
</constructor-arg>
</bean>
<bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor">
<constructor-arg>
<value>3</value>
</constructor-arg>
</bean>
每个 bean 都使用不同的价格进行初始化,以便不同(包含在构造器参数中)。当消息发送到每个 bean 端点时,它将到达 MyVendor.getQuote 方法。此方法进行简单检查,查看此报价请求是否是er beer,然后在稍后的步骤上对交换进行一次检索价格。使用 POJO Prod ing (请参阅 @Produce 注解)将消息转发到下一步。
下一步,我们想要从所有供应商引出引号,找出哪个是最佳(即最低程度)。为此,我们使用带有自定义聚合策略的 第 8.5 节 “聚合器”。第 8.5 节 “聚合器” 需要识别与当前引用相关的消息,该引用是通过根据 quoteRequestId 标头的值填充消息(传递给 correlationExpression)实现的。如 例 8.1 “消息传递客户端示例” 所示,关联 ID 被设置为 quoteRequest-1 ( 关联 ID 应为唯一)。要选择集合中最低引用,您可以使用类似如下的自定义聚合策略:
public class LowestQuoteAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// the first time we only have the new exchange
if (oldExchange == null) {
return newExchange;
}
if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
return oldExchange;
} else {
return newExchange;
}
}
}静态类别-gather 示例
您可以使用静态 第 8.3 节 “接收者列表” 在 scatter-gather 应用程序中明确指定接收者。以下示例显示了您要用来实现静态 scatter-gather 情境的路由:
from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3");
from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator");
from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator");
from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator");
from("seda:quoteAggregator")
.aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")