11.7. durable Subscriber

概述

一个 可靠的订阅者 (如 图 11.6 “durable Subscriber Pattern” 所示),它希望接收通过特定 第 6.2 节 “publish-Subscribe Channel” 频道发送的所有消息,包括消费者从消息传递系统断开连接时发送的消息。这要求消息传递系统将消息存储到断开连接的消费者中。对于消费者而言,还必须是一种机制,指明它要建立持久的订阅。通常,发布订阅频道(或主题)可以同时有持久的订阅者,其行为如下:

  • 可持续的订阅者 Inktank - theCan 有两个状态:已连接和断开连接的。 虽然一个不可避免的订阅者连接到一个主题,但它会实时接收所有主题的消息。但是,在订阅者断开连接时,不可避免的订阅者不会接收发送到主题的消息。
  • durable subscriber abrt-abrtCan 有两个状态: connectedinactive。inactive 状态意味着 durable 订阅者与主题断开连接,但希望接收到达 interim 的消息。当 durable 订阅者重新连接到该主题时,它会收到发送的所有消息的回放。

图 11.6. durable Subscriber Pattern

durable subscriber pattern

JMS durable subscriber

JMS 组件实施 durable 订阅者模式。要在 JMS 端点上设置持久化订阅,您必须指定 客户端 ID,它用于标识此特定连接,以及确定 durable 订阅名称。例如,以下路由设置了一个持久订阅到 JMS 主题( news ),其客户端 ID 为 conn01,一个持久的订阅名称为 John.Doe

from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe").
    to("cxf:bean:newsprocessor");

您还可以使用 ActiveMQ 端点设置持久订阅:

from("activemq:topic:news?clientId=conn01&durableSubscriptionName=John.Doe").
    to("cxf:bean:newsprocessor");

如果要同时处理传入的信息,您可以使用 SEDA 端点将路由填充到多个并行片段中,如下所示:

from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe").
    to("seda:fanout");

from("seda:fanout").to("cxf:bean:newsproc01");
from("seda:fanout").to("cxf:bean:newsproc02");
from("seda:fanout").to("cxf:bean:newsproc03");

每个消息都仅处理一次,因为 SEDA 组件支持 有竞争的使用者 模式。

其他示例

另一个选择是将 第 11.5 节 “Message Dispatcher”第 8.1 节 “基于内容的路由器”文件JPA 组件合并,用于临时订阅者,类似非持久性的 SEDA

以下是创建可靠订阅者到 JMS 主题的简单示例

使用 Fluent Builders

 from("direct:start").to("activemq:topic:foo");

 from("activemq:topic:foo?clientId=1&durableSubscriptionName=bar1").to("mock:result1");

 from("activemq:topic:foo?clientId=2&durableSubscriptionName=bar2").to("mock:result2");

使用 Spring XML 扩展

 <route>
     <from uri="direct:start"/>
     <to uri="activemq:topic:foo"/>
 </route>

 <route>
     <from uri="activemq:topic:foo?clientId=1&durableSubscriptionName=bar1"/>
     <to uri="mock:result1"/>
 </route>

 <route>
     <from uri="activemq:topic:foo?clientId=2&durableSubscriptionName=bar2"/>
     <to uri="mock:result2"/>
 </route>

以下是 JMS durable 订阅者的另一个示例,但这一次使用 虚拟主题 (由 AMQ over durable 订阅推荐)

使用 Fluent Builders

 from("direct:start").to("activemq:topic:VirtualTopic.foo");

 from("activemq:queue:Consumer.1.VirtualTopic.foo").to("mock:result1");

 from("activemq:queue:Consumer.2.VirtualTopic.foo").to("mock:result2");

使用 Spring XML 扩展

 <route>
     <from uri="direct:start"/>
     <to uri="activemq:topic:VirtualTopic.foo"/>
 </route>

 <route>
     <from uri="activemq:queue:Consumer.1.VirtualTopic.foo"/>
     <to uri="mock:result1"/>
 </route>

 <route>
     <from uri="activemq:queue:Consumer.2.VirtualTopic.foo"/>
     <to uri="mock:result2"/>
 </route>