11.4. 竞争消费者

概述

图 11.3 “竞争使用者模式” 中显示的 竞争消费者 模式可让多个消费者从同一队列拉取信息,保证 每个消息仅被使用一次。此模式可用于将串行消息处理替换为并发消息处理(降低响应延迟)。

图 11.3. 竞争使用者模式

竞争的使用者模式

以下组件展示竞争的使用者模式:

基于 JMS 的竞争消费者

常规 JMS 队列隐式保证每个消息一次只能被使用。因此,JMS 队列会自动支持竞争的使用者模式。例如,您可以定义三个来自 JMS 队列 HighVolumeQ 的消息的竞争消费者,如下所示:

from("jms:HighVolumeQ").to("cxf:bean:replica01");
from("jms:HighVolumeQ").to("cxf:bean:replica02");
from("jms:HighVolumeQ").to("cxf:bean:replica03");

如果 CXF(Web 服务)端点、replica01Replica02replica03,它并行处理来自 HighVolumeQ 队列的消息。

另外,您可以设置 JMS 查询选项 ,以便创建有竞争消费者的线程池。例如,以下路由创建了一个三个竞争的线程池,从指定队列获取消息:

from("jms:HighVolumeQ?concurrentConsumers=3").to("cxf:bean:replica01");

concurrentConsumers 选项也可以在 XML DSL 中指定,如下所示:

 <route>
   <from uri="jms:HighVolumeQ?concurrentConsumers=3"/>
   <to uri="cxf:bean:replica01"/>
 </route>
注意

JMS 主题 不支持竞争的消费者模式。按照定义,JMS 主题旨在向不同的消费者发送相同消息的多个副本。因此,它与竞争的使用者模式不兼容。

基于竞争消费者的 SEDA

SEDA 组件的目的是通过将计算划分为多个阶段来简化并发处理。SEDA 端点实际上封装了内存块队列(由 java.util.concurrent.BlockingQueue实施)。因此,您可以使用 SEDA 端点将路由划分为多个阶段,每个阶段都可能会使用多个线程。例如,您可以定义由两个阶段组成的 SEDA 路由,如下所示:

// Stage 1: Read messages from file system.
from("file://var/messages").to("seda:fanout");

// Stage 2: Perform concurrent processing (3 threads).
from("seda:fanout").to("cxf:bean:replica01");
from("seda:fanout").to("cxf:bean:replica02");
from("seda:fanout").to("cxf:bean:replica03");

其中,第一个阶段包含一个线程,它消耗来自文件端点 file://var/messages 的消息,并将它们路由到 SEDA 端点, seda:fanout。第二个阶段包含三个线程:路由交换到 cxf:bean:replica01 的线程,路由交换到 cxf:bean:replica02 的线程,以及路由交换到 cxf:bean:replica03 的线程。这三个线程与来自 SEDA 端点的实例相比,该端点使用阻止队列实施。由于块队列使用锁定来防止多个线程一次访问队列,所以您可以保证每个交换实例一次只能使用一次。

有关 SEDA 端点和 thread() 创建的线程池之间的区别,请参阅 Apache Camel 组件参考指南 中的 SEDA 组件