第14章 メッセージング
14.1. HornetQ
14.1.1. HornetQ
14.1.2. メッセージングの概念
14.1.3. サポートされているメッセージ形式
- Message Queue パターン
- Message Queue パターンでは、メッセージをキューに送信する必要があります。メッセージがキューに入ると、通常は永続化されて、配信が保証されます。キューを通過したメッセージは、メッセージングシステムによりメッセージコンシューマーに配信されます。 メッセージが処理されると、メッセージコンシューマーはメッセージが配信されたことを確認応答します。Message Queue パターンでは、ポイントツーポイントメッセージングと併用すると、複数のコンシューマーをキューに入れることが可能ですが、各メッセージは単一のコンシューマーのみが受信可能となります。
- Publish-Subscribe パターン
- Publish-Subscribe パターンでは、サーバー上の単一のエンティティに対して複数の送信者がメッセージを送信することが可能です。このエンティティは、「トピック」として広く知られています。各トピックには、複数のコンシューマーが参加することが可能です。これは、「サブスクリプション」として知られています。各サブスクリプションは、トピックによって送信された全メッセージのコピーを受信します。これは、各メッセージを消費するのが単一のコンシューマーのみである Message Queue パターンとは異なります。永続的なサブスクリプションは、トピックに送信された各メッセージをサブスクライバーが消費するまで、そのコピーを保持します。このようなコピーは、サーバーの再起動時にも維持されます。非永続的なサブスクリプションは、そのサブスクリプションを作成した接続が有効な間のみ継続します。
14.1.4. アクセプターおよびコネクターについて
アクセプターおよびコネクター
Acceptor
- アクセプターは、HornetQ サーバーが受け入れる接続タイプを定義します。
Connector
- コネクターは、HornetQ サーバーに接続する方法を定義し、HornetQ クライアントによって使用されます。
Invm および Netty
Invm
- Invm は、Intra Virtual Machine の略語であり、クライアントとサーバーが同じ JVM で実行されているときに使用できます。
Netty
- JBoss プロジェクトの名前。クライアントとサーバーが異なる JVM で実行されている場合に使用する必要があります。
standalone.xml
と domain.xml
で設定されます。これらは、管理コンソールまたは管理 CLI のいずれかを使用して定義できます。
例14.1 デフォルトのアクセプターおよびコネクター設定の例
<connectors> <netty-connector name="netty" socket-binding="messaging"/> <netty-connector name="netty-throughput" socket-binding="messaging-throughput"> <param key="batch-delay" value="50"/> </netty-connector> <in-vm-connector name="in-vm" server-id="0"/> </connectors> <acceptors> <netty-acceptor name="netty" socket-binding="messaging"/> <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput"> <param key="batch-delay" value="50"/> <param key="direct-deliver" value="false"/> </netty-acceptor> <in-vm-acceptor name="in-vm" server-id="0"/> </acceptors>
14.1.5. ブリッジについて
14.1.6. 大規模なメッセージの処理
14.1.8. HornetQ のアプリケーションへの埋め込み
設定オブジェクトのインスタンス化
設定ファイルから、もしくはプログラムを使用して設定パラメーターを指定することにより設定オブジェクトをインスタンス化します。ファイルから設定オブジェクトを作成します。
FileConfigurationImpl
クラスを使用して、ファイルをベースとした設定オブジェクトを設定します。import org.hornetq.core.config.Configuration; import org.hornetq.core.config.impl.FileConfiguration; ... Configuration config = new FileConfiguration(); config.setConfigurationUrl(<replaceable>file-url</replaceable>); config.start();
プログラムを使用して設定オブジェクトを作成します。
ConfigurationImpl
クラスを使用して設定オブジェクトを作成します。import org.hornetq.core.config.Configuration; import org.hornetq.core.config.impl.FileConfiguration; ... Configuration config = new ConfigurationImpl();
- acceptors などの任意の設定パラメーターを指定します。例:
HashSet<TransportConfiguration> transports = new HashSet<TransportConfiguration>(); transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName())); transports.add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); config.setAcceptorConfigurations(transports);
サーバーをインスタンス化して起動します。
org.hornetq.api.core.server.HornetQ
静的メソッドを使用して、設定オブジェクトをベースとするサーバーを作成して起動します。import org.hornetq.api.core.server.HornetQ; import org.hornetq.core.server.HornetQServer; ... HornetQServer server = HornetQ.newHornetQServer(config); server.start();
埋め込み HornetQ インスタンスは内部メッセージング用にインスタンス化されます。埋め込み HornetQ に接続するには、通常どおりにファクトリを作成します。
14.1.9. JMS サーバーの設定
EAP_HOME/domain/configuration/domain.xml
ファイル、スタンドアローンは EAP_HOME/standalone/configuration/standalone.xml
にあります。
<subsystem xmlns="urn:jboss:domain:messaging:1.0">
要素には、JMS 設定すべてが含まれています。JNDI で必要な JMS のConnectionFactory
、Queue
、あるいはTopic
インスタンスを追加します。
Enterprise Application Platform で JMS サブシステムを有効化
<extensions>
要素にて、以下の行が存在しておりコメントアウトされていないか確認してください。<extension module="org.jboss.as.messaging"/>
基本的な JMS サブシステムの追加
メッセージングサブシステムが設定ファイルに存在しない場合は、追加します。- お使いのプロファイルに該当する
<profile>
を検索し<subsystems>
タグの場所を探します。 <subsystems>
タグのすぐ下に新しい行を追加します。以下をそこに貼り付けます。<subsystem xmlns="urn:jboss:domain:messaging:1.0"> </subsystem>
その他の設定はすべて、その上の空いている行に追加します。
JMS の基本設定を追加
<journal-file-size>102400</journal-file-size> <journal-min-files>2</journal-min-files> <journal-type>NIO</journal-type> <!-- disable messaging persistence --> <persistence-enabled>false</persistence-enabled>
要件にあわせ上記の値をカスタマイズHornetQ に接続ファクトリインスタンスを追加
クライアントは、JMSConnectionFactory
オブジェクトを使い、サーバーへの接続を確立します。JMS 接続ファクトリオブジェクトを HornetQ に追加するには、<jms-connection-factories>
タグを1つ、接続ファクトリごとに<connection-factory>
要素を以下のように追加します。<subsystem xmlns="urn:jboss:domain:messaging:1.0"> ... <jms-connection-factories> <connection-factory name="myConnectionFactory"> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="/ConnectionFactory"/> </entries> </connection-factory> </jms-connection-factories> ... </subsystem>
netty
コネクターの設定この JMS 接続ファクトリは、netty
コネクターを利用します。これは、サーバー設定ファイルにデプロイされたコネクターオブジェクトへの参照です。コネクターオブジェクトは、実際にサーバーへ接続する際に利用するトランスポートとパラメーターを定義します。netty
コネクターを設定するには、以下の設定を含めます。<subsystem xmlns="urn:jboss:domain:messaging:1.0"> ... <connectors> <netty-connector name="netty" socket-binding="messaging"/> <netty-connector name="netty-throughput" socket-binding="messaging-throughput"> <param key="batch-delay" value="50"/> </netty-connector> <in-vm-connector name="in-vm" server-id="0"/> </connectors> ... </subsystem>
コネクターは、messaging
およびmessaging-throughput
ソケットバインディングを参照します。messaging
ソケットバインディングは、ポート 5445 を使い、messaging-throughput
ソケットバインディングはポート 5455 を使います。 以下のソケットバインディングが<socket-binding-groups>
要素に含まれているよう確認してください。<socket-binding-groups> ... <socket-binding-group ... > <socket-binding name="messaging" port="5445"/> <socket-binding name="messaging-throughput" port="5455"/> ... </socket-binding-group> ... </socket-binding-groups>
キューインスタンスを HornetQ に追加
クライアントは JMSQueue
オブジェクトを使いサーバーへ配信する送信メッセージを段階分けします。JMS キューオブジェクトを HornetQ に追加するには、以下のように<jms-queue>
要素を含めます。<subsystem xmlns="urn:jboss:domain:messaging:1.0"> ... <jms-destinations> <jms-queue name="myQueue"> <entry name="/queue/myQueue"/> </jms-queue> </jms-destinations> ...
オプション:トピックインスタンスを HornetQ に追加
クライアントは JMSTopic
オブジェクトを使い、複数のサブスクライバー向けにメッセージを管理します。JMS トピックオブジェクトを追加するには、<topic>
要素を以下のように主組めます。<subsystem xmlns="urn:jboss:domain:messaging:1.0"> ... <topic name="myTopic"> <entry name="/topic/myTopic"/> </topic> ...
追加設定
追加設定が必要な場合、EAP_HOME/docs/schema/jboss-messaging.xsd
の DTD を確認してください。
14.1.10. Java Naming and Directory Interface (JNDI) について
14.1.11. HornetQ 向けに JNDI を設定
手順14.1 タスク
JNDIServer
bean を設定します。
クライアント側で JNDI プロパティを設定
JNDI プロパティにより JNDI クライアントに JNDI サーバーがどこにあるか伝えます。このプロパティをクライアントのクラスパスにあるjndi.properties
のという名前のファイルで指定するか、あるいは最初の JNDI コンテキストの作成時に直接宣言することができます。JNDI プロパティを以下の構成に設定java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory java.naming.provider.url=jnp://hostname:1099 java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
- hostname は JNDI サーバーのホスト名あるいは IP アドレスです。
JNDI サーバーポートの設定
管理コンソールの Socket Binding Groups パネルに移動します。
コンソールの左側のメニューから General Configuration → Socket Binding Groups オプションを選択します。選択したサーバーの Socket Binding Groups パネルが表示されます。図14.1 サーバー設定
JNDI ソケットバインディングの編集
Socket Binding Declarations の表から jndi を選択し、下にある ソケットバインディング の Edit ボタンを選択してください。以下の設定と Multicast Port の設定を行い、設定後、Save ボタンを押します。- Port:
1099
- Multicast Port:
1098
JNDI サーバーは HornetQ 用に設定されました。
14.1.12. JMS アドレス の設定
<address-settings>
設定要素内に存在します。
アドレスワイルドカードを使用すると単一のステートメントで複数の似たアドレスを一致させることができます。これは、システムがアスタリスク ( * ) 文字を使用して 1 回の検索で複数のファイルや文字列を一致させることと似ています。以下の文字はワイルドカードステートメントでは特別な意味を持っています。
表14.1 JMX のワイルドカード構文
文字 | 詳細 |
---|---|
. (ピリオド 1 つ) | ワイルドカード表現で単語の間のスペースを意味します。 |
# (シャープまたはハッシュマーク) | ゼロ以上の単語シーケンスと一致します。 |
* (アスタリスク) | 1 つの単語と一致します。 |
表14.2 JMS ワイルドカードの例
例 | 詳細 |
---|---|
news.europe.# | news.europe 、 news.europe.sport 、 news.europe.politic と一致しますが、 news.usa や europe とは一致しません。
|
news. | news.europe と一致しますが、 news.europe.sport とは一致しません。
|
news.*.sport | news.europe.sport と news.usa.sport とは一致しますが、news.europe.politics とは一致しません。
|
例14.2 デフォルトアドレス設定
<address-settings> <!--default for catch all--> <address-setting match="#"> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <max-size-bytes>10485760</max-size-bytes> <address-full-policy>BLOCK</address-full-policy> <message-counter-history-day-limit>10</message-counter-history-day-limit> </address-setting> </address-settings>
表14.3 JMS アドレス設定の説明
要素 | 詳細 | デフォルト値 | タイプ |
---|---|---|---|
address-full-policy
|
max-size-bytes が指定されたアドレスがいっぱいの場合に起こることを決定します。
|
PAGE
|
STRING
|
dead-letter-address
|
無効なレターアドレスが指定された場合、
max-delivery-attempts 配信試行が失敗すると、メッセージが無効なレターアドレスに移動されます。それ以外の場合、これらに未配信メッセージは破棄されます。ワイルドカードは許可されます。
|
jms.queue.DLQ
|
STRING
|
expiry-address
|
有効期限が切れたアドレスが存在する場合、有効期限が切れたメッセージは破棄されるのではなく、一致するアドレスに送信されます。ワイルドカードは許可されます。
|
jms.queue.ExpiryQueue
|
STRING
|
last-value-queue
|
キューで最後の値のみを使用するかどうかを定義します。
|
false
|
BOOLEAN
|
max-delivery-attempts
| dead-letter-address に送信する前、または破棄する前にメッセージを再配信する最大試行回数。
|
10
|
INT
|
max-delivery-bytes
|
最大バイトサイズ。
|
10485760L
|
LONG
|
message-counter-history-day-limit
|
メッセージカウンター履歴の日数制限。
|
10
|
INT
|
page-max-cache-size
|
ページングナビゲーション中に IO を最適化するためにメモリー内に保持するページファイルの数。
|
5
|
INT
|
page-size-bytes
|
ページングサイズ。
|
5
|
INT
|
redelivery-delay
|
メッセージの再配信試行間の遅延時間 (ミリ秒単位)。
0 に設定された場合は、再配信が無限に試行されます。
|
0L
|
LONG
|
redistribution-delay
|
メッセージを再配信する前に最後のコンシューマーがキューで閉じられたときの待機時間を定義します。
|
-1L
|
LONG
|
send-to-dla-on-no-route
|
キューにルーティングされず、そのアドレスに指定された無効なレターアドレス (DLA) に送信されたメッセージの条件を設定するアドレスのパラメーター。
|
false
|
BOOLEAN
|
アドレス設定とパターン属性の設定
管理 CLI または管理コンソールを選択して、必要に応じてパターン属性を設定します。管理 CLI を使用したアドレス設定
管理 CLI を使用してアドレスを設定します。新しいパターンの追加
add
操作を使用して、新しいアドレス設定を作成します (必要な場合)。このコマンドは、管理 CLI セッションのルートから実行できます。この場合、以下の例では、 patternname というタイトルの新しいパターンが作成され、max-delivery-attempts
属性が 5 として宣言されます。full
プロファイルのスタンドアロンと監理対象ドメインの編集の例が示されます。[domain@localhost:9999 /] /profile=full/subsystem=messaging/hornetq-server=default/address-setting=patternname/:add(max-delivery-attempts=5)
[standalone@localhost:9999 /] /subsystem=messaging/hornetq-server=default/address-setting=patternname/:add(max-delivery-attempts=5)
パターン属性の編集
write
操作を使用して新しい値を属性に書き込みます。タブ補完を使用して、入力するコマンド文字列を補完したり、利用可能な属性を公開したりできます。以下の例は、max-delivery-attempts
値を 10 に更新します。[domain@localhost:9999 /] /profile=full/subsystem=messaging/hornetq-server=default/address-setting=patternname/:write-attribute(name=max-delivery-attempts,value=10)
[standalone@localhost:9999 /] /subsystem=messaging/hornetq-server=default/address-setting=patternname/:write-attribute(name=max-delivery-attempts,value=10)
パターン属性の確認
read-resource
操作でinclude-runtime=true
パラメーターを実行して値を変更し、サーバーモデルでアクティブな現在のすべての値を公開します。[domain@localhost:9999 /] /profile=full/subsystem=messaging/hornetq-server=default/address-setting=patternname/:read-resource
[standalone@localhost:9999 /] /subsystem=messaging/hornetq-server=default/address-setting=patternname/:read-resource
管理コンソールを使用したアドレスの設定
管理コンソールを使用してアドレスを設定します。管理コンソールにログインします。
管理対象ドメインまたはスタンドアロンサーバーの管理コンソールにログインします。監理対象ドメインを使用する場合は、適切なプロファイルを選択します。
右上にある Profiles タブを選択し、次の画面の左上にある Profile メニューから適切なプロファイルを選択します。full
プロファイルとfull-ha
プロファイルのみでmessaging
サブシステムが有効になります。ナビゲーションメニューから Messaging 項目を選択します。
ナビゲーションメニューから Messaging メニュー項目を展開し、Destinations をクリックします。JMS プロバイダーを表示します。
JMS プロバイダーのリストが表示されます。デフォルトの設定では、default
という名前の 1 つのプロバイダーだけが表示されます。View リンクをクリックして、このプロバイダーの詳細な設定を表示します。アドレス設定を表示します。
Addressing タブをクリックします。Add ボタンをクリックして新しいパターンを追加するか、名前をクリックし、次に Edit ボタンをクリックすることにより、既存のパターンを編集します。オプションを設定します。
新しいパターンを追加する場合、Pattern フィールドはaddress-setting
要素のmatch
パラメーターを参照します。また、Dead Letter Address、Expiry Address、Redelivery Delay、および Max Delivery Attempts を編集することもできます。他のオプションは、管理 CLI を使用して設定する必要があります。
14.1.13. HornetQ 設定属性のリファレンス
read-resource
操作で設定可能または表示可能な属性を公開できます。
例14.3 例
[standalone@localhost:9999 /] /subsystem=messaging/hornetq-server=default:read-resource
表14.4 TableTitle
属性 | サンプル値 | タイプ |
---|---|---|
allow-failback | true | BOOLEAN |
async-connection-execution-enabled | true | BOOLEAN |
backup | false | BOOLEAN |
cluster-password | somethingsecure | STRING |
cluster-user | HORNETQ.CLUSTER.ADMIN.USER | STRING |
clustered | false | BOOLEAN |
connection-ttl-override | -1 | LONG |
create-bindings-dir | true | BOOLEAN |
create-journal-dir | true | BOOLEAN |
failback-delay | 5000 | LONG |
failover-on-shutdown | false | BOOLEAN |
id-cache-size | 2000 | INT |
jmx-domain | org.hornetq | STRING |
jmx-management-enabled | false | BOOLEAN |
journal-buffer-size | 100 | LONG |
journal-buffer-timeout | 100 | LONG |
journal-compact-min-files | 10 | INT |
journal-compact-percentage | 30 | INT |
journal-file-size | 102400 | LONG |
journal-max-io | 1 | INT |
journal-min-files | 2 | INT |
journal-sync-non-transactional | true | BOOLEAN |
journal-sync-transactional | true | BOOLEAN |
journal-type | ASYNCIO | STRING |
live-connector-ref | reference | STRING |
log-journal-write-rate | false | BOOLEAN |
management-address | jms.queue.hornetq.management | STRING |
management-notification-address | hornetq.notifications | STRING |
memory-measure-interval | -1 | LONG |
memory-warning-threshold | 25 | INT |
message-counter-enabled | false | BOOLEAN |
message-counter-max-day-history | 10 | INT |
message-counter-sample-period | 10000 | LONG |
message-expiry-scan-period | 30000 | LONG |
message-expiry-thread-priority | 3 | INT |
page-max-concurrent-io | 5 | INT |
perf-blast-pages | -1 | INT |
persist-delivery-count-before-delivery | false | BOOLEAN |
persist-id-cache | true | BOOLEAN |
persistence-enabled | true | BOOLEAN |
remoting-interceptors | 未定義 | LIST |
run-sync-speed-test | false | BOOLEAN |
scheduled-thread-pool-max-size | 5 | INT |
security-domain | その他 | STRING |
security-enabled | true | BOOLEAN |
security-invalidation-interval | 10000 | LONG |
server-dump-interval | -1 | LONG |
shared-store | true | BOOLEAN |
started | true | BOOLEAN |
thread-pool-max-size | 30 | INT |
transaction-timeout | 300000 | LONG |
transaction-timeout-scan-period | 1000 | LONG |
version | 2.2.16.Final (HQ_2_2_16_FINAL, 122) | STRING |
wild-card-routing-enabled | true | BOOLEAN |
14.1.14. HornetQ でのメッセージングの設定
standalone.xml
や domain.xml
設定ファイルを手作業で編集せずに永続的な変更を行うことができますが、デフォルト設定ファイルのメッセージングコンポーネントについて理解できると便利です。デフォルトの設定ファイルでは、管理ツールを使用するドキュメントサンプルにより参考用の設定ファイル断片が提供されます。
14.1.15. 遅延再配信の設定
遅延再配信は、Java Messaging Service (JMS) のサブシステム設定の <address-setting>
設定要素の子要素である <redelivery-delay>
要素に定義されます。
<!-- delay redelivery of messages for 5s --> <address-setting match="jms.queue.exampleQueue"> <redelivery-delay>5000</redelivery-delay> </address-setting>
<redelivery-delay>
を 0
に設定すると、再配信の遅延はありません。<address-setting-match>
要素にアドレスワイルドカードを使用すると、ワイルドカードに一致するアドレスに対して再配信の遅延を設定することができます。
14.1.16. デッドレターアドレスの設定
デッドレターアドレスは Java Messaging Service (JMS) の サブシステム設定の <address-setting>
要素で定義されます。
<!-- undelivered messages in exampleQueue will be sent to the dead letter address deadLetterQueue after 3 unsuccessful delivery attempts --> <address-setting match="jms.queue.exampleQueue"> <dead-letter-address>jms.queue.deadLetterQueue</dead-letter-address> <max-delivery-attempts>3</max-delivery-attempts> </address-setting>
<dead-letter-address>
が指定されていないと、 <max-delivery-attempts>
で指定される回数配信を試みた後、メッセージが削除されます。 デフォルトでは 10回 メッセージの配信を試みます。<max-delivery-attempts>
を -1
に設定すると、再配信が無限に試行されます。例えば、一致するアドレスのセットに対してグローバルにデッドレターを設定することができ、特定アドレスの <max-delivery-attempts>
を -1
に設定し、このアドレスのみ再配信が無限に行われるようにすることが可能です。アドレスワイルドカードを使用してアドレスのセットにデッドレターを設定することも可能です。
14.1.17. メッセージ期限切れアドレス
メッセージ有効期限アドレスは Java Messaging Service (JMS) の address-setting 設定に定義されています。例は次の通りです。
<!-- expired messages in exampleQueue will be sent to the expiry address expiryQueue --> <address-setting match="jms.queue.exampleQueue"> <expiry-address>jms.queue.expiryQueue</expiry-address> </address-setting>
アドレスワイルドカードを使用すると単一のステートメントで複数の似たアドレスを一致させることができます。これは、システムがアスタリスク ( * ) 文字を使用して 1 回の検索で複数のファイルや文字列を一致させることと似ています。以下の文字はワイルドカードステートメントでは特別な意味を持っています。
表14.5 JMX のワイルドカード構文
文字 | 説明 |
---|---|
. (ピリオド 1 つ) | ワイルドカード表現で言葉の間のスペースを意味します。 |
# (シャープまたはハッシュマーク) | ゼロ以上の言葉の配列と一致します。 |
* (アスタリスク) | 1 つの言葉と一致します。 |
表14.6 JMS ワイルドカードの例
例 | 説明 |
---|---|
news.europe.# | news.europe 、 news.europe.sport 、 news.europe.politic と一致しますが、 news.usa や europe とは一致しません。
|
news. | news.europe と一致しますが、 news.europe.sport とは一致しません。
|
news.*.sport | news.europe.sport と news.usa.sport とは一致しますが、news.europe.politics とは一致しません。
|
14.1.18. メッセージの有効期限の設定
HornetQ Core API を使用すると失効時間を直接メッセージに設定できます。例は次の通りです。
// message will expire in 5000ms from now message.setExpiration(System.currentTimeMillis() + 5000);
MessageProducer
JMS MessageProducer
には送信するメッセージの有効期限を制御する TimeToLive
パラメーターが含まれています。
// messages sent by this producer will be retained for 5s (5000ms) before expiration producer.setTimeToLive(5000);
- _HQ_ORIG_ADDRESS
- _HQ_ACTUAL_EXPIRY