11.7. Durable Subscriber

概要

図11.6「Durable Subscriber パターン」 に記載されている 永続サブスクライバー (Durable Subscriber) は、特定の 「Publish-Subscribe Channel」 チャネルに送信されたすべてのメッセージを受信することを意図したコンシューマーです。これには、コンシューマーがメッセージングシステムから切断されている間に送信されたメッセージも含まれます。そのためには、メッセージングシステムが切断されたコンシューマーに対して後でリプレイするためにメッセージを保存しておく必要があります。また、コンシューマー側には永続サブスクリプションの確立が必要であることを示すメカニズムも必要です。通常、パブリッシュサブスクライブチャネル (またはトピック) には、永続サブスクライバーと非永続サブスクライバーの両方を設定できます。それぞれ以下のように動作します。

  • 非永続サブスクライバー - 接続切断 の 2 つの状態を持つことができます。非永続サブスクライバーがトピックに接続されている間は、トピックのすべてのメッセージをリアルタイムで受信します。しかし、非永続サブスクライバーが切断されている間は、トピックに送信されたメッセージを一切受信しません。
  • 永続的サブスクライバー - 接続非アクティブ の 2 つの状態を持つことができます。非アクティブ状態とは、永続サブスクライバーはトピックから切断されているものの、その間に到達したメッセージを受信するつもりであることを意味します。永続サブスクライバーがトピックに再接続すると、非アクティブ時に送信されたすべてのメッセージがリプレイされます。

図11.6 Durable Subscriber パターン

Durable Subscriber パターン

JMS 永続サブスクライバー

JMS コンポーネントは Durable Subscriber パターンを実装しています。JMS エンドポイントに永続サブスクリプションを設定するには、この接続を識別するための クライアント ID と永続サブスクライバーを識別するための 永続サブスクリプション名 を指定する必要があります。たとえば、以下のルートは、クライアント ID が conn01 で、永続サブスクリプション名が John.Doe の JMS トピック news に永続サブスクリプションを設定します。

from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe").
    to("cxf:bean:newsprocessor");

ActiveMQ エンドポイントを使用して永続サブスクリプションを設定することもできます。

from("activemq:topic:news?clientId=conn01&durableSubscriptionName=John.Doe").
    to("cxf:bean:newsprocessor");

受信メッセージを同時に処理する場合は、SEDA エンドポイントを使用して、以下のように複数の並列セグメントにルートを分散させることができます。

from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe").
    to("seda:fanout");

from("seda:fanout").to("cxf:bean:newsproc01");
from("seda:fanout").to("cxf:bean:newsproc02");
from("seda:fanout").to("cxf:bean:newsproc03");

SEDA コンポーネントは Competing Consumers パターンをサポートするため、各メッセージは一度だけ処理されます。

代替例

もう 1 つの代替方法として、「Message Dispatcher」 または 「Content-Based Router」 を、永続サブスクライバーの File または JPA コンポーネントと組み合わせた後、非永続サブスクライバーの SEDA のようなコンポーネントと組み合わせる方法もあります。

以下は、JMS トピックへの永続サブスクライバーを作成する簡単な例です。

Fluent Builder (流れるようなビルダー) の使用

 from("direct:start").to("activemq:topic:foo");

 from("activemq:topic:foo?clientId=1&durableSubscriptionName=bar1").to("mock:result1");

 from("activemq:topic:foo?clientId=2&durableSubscriptionName=bar2").to("mock:result2");

Spring XML エクステンション の使用

 <route>
     <from uri="direct:start"/>
     <to uri="activemq:topic:foo"/>
 </route>

 <route>
     <from uri="activemq:topic:foo?clientId=1&durableSubscriptionName=bar1"/>
     <to uri="mock:result1"/>
 </route>

 <route>
     <from uri="activemq:topic:foo?clientId=2&durableSubscriptionName=bar2"/>
     <to uri="mock:result2"/>
 </route>

以下は、JMS 永続サブスクライバーのもう 1 つの例ですが、今回は 仮想トピック を使用しています (AMQ で永続サブスクリプションを使用する場合に推奨されます)。

Fluent Builder (流れるようなビルダー) の使用

 from("direct:start").to("activemq:topic:VirtualTopic.foo");

 from("activemq:queue:Consumer.1.VirtualTopic.foo").to("mock:result1");

 from("activemq:queue:Consumer.2.VirtualTopic.foo").to("mock:result2");

Spring XML エクステンション の使用

 <route>
     <from uri="direct:start"/>
     <to uri="activemq:topic:VirtualTopic.foo"/>
 </route>

 <route>
     <from uri="activemq:queue:Consumer.1.VirtualTopic.foo"/>
     <to uri="mock:result1"/>
 </route>

 <route>
     <from uri="activemq:queue:Consumer.2.VirtualTopic.foo"/>
     <to uri="mock:result2"/>
 </route>