AMQ Core Protocol JMS クライアントの使用

Red Hat AMQ 7.7

AMQ Clients 2.7 向け

概要

本ガイドでは、クライアントをインストールして設定する方法、実例を実行し、他の AMQ コンポーネントでクライアントを使用する方法を説明します。

第1章 概要

AMQ Core Protocol JMS は、Artemis Core Protocol メッセージを送受信するメッセージングアプリケーションで使用する Java Message Service (JMS) 2.0 クライアントです。

AMQ Core Protocol JMS は AMQ Clients (複数の言語やプラットフォームをサポートするメッセージングライブラリースイート) に含まれています。クライアントの概要は、AMQ Clients の概要 を参照してください。本リリースに関する詳細は、AMQ Clients 2.7 リリースノート を参照してください。

AMQ JMS は、Apache Qpid からの JMS 実装に基づいています。JMS API の詳細は、JMS API reference および JMS tutorial を参照してください。

1.1. 主な特長

  • JMS 1.1 および 2.0 との互換性
  • セキュアな通信用の SSL/TLS
  • 自動再接続およびフェイルオーバー
  • 分散トランザクション (XA)
  • Pure-Java 実装

1.2. サポートされる標準およびプロトコル

AMQ Core Protocol JMS は、以下の業界標準およびネットワークプロトコルをサポートします。

1.3. サポートされる構成

AMQ コアプロトコル JMS は、以下に示す OS と言語のバージョンをサポートしています。詳細は、Red Hat AMQ 7 Supported Configurations を参照してください。

  • 以下の JDK を使用する Red Hat Enterprise Linux 7 および 8:

    • OpenJDK 8 および 11
    • Oracle JDK 8
    • IBM JDK 8
  • 以下の JDK を使用する Red Hat Enterprise Linux 6:

    • OpenJDK 8
    • Oracle JDK 8
  • IBM AIX 7.1 と IBM JDK 8
  • Oracle JDK 8 を搭載した Microsoft Windows 10 Pro
  • Oracle JDK 8 を搭載した Microsoft Windows Server 2012 R2 および 2016
  • Oracle JDK 8 を使用する Oracle Solaris 10 および 11

AMQ Core Protocol JMS は、最新バージョンの AMQ Broker との組み合わせでサポートされています。

1.4. 用語および概念

本セクションでは、コア API エンティティーを紹介し、コア API が連携する方法を説明します。

表1.1 API の用語

エンティティー説明

ConnectionFactory

接続を作成するエントリーポイント。

接続

ネットワーク上の 2 つのピア間の通信チャネル。これにはセッションが含まれます。

Session

メッセージを生成および消費するためのコンテキスト。メッセージプロデューサーとコンシューマーが含まれます。

MessageProducer

メッセージを宛先に送信するためのチャネル。ターゲットの宛先があります。

MessageConsumer

宛先からメッセージを受信するためのチャネル。ソースの宛先があります。

宛先

メッセージの名前付きの場所 (キューまたはトピックのいずれか)。

Queue

メッセージの保存されたシーケンス。

トピック

マルチキャスト配布用のメッセージの保存されたシーケンス。

メッセージ

情報のアプリケーション固有の部分。

AMQ Core Protocol JMS は メッセージ を送受信します。メッセージは、メッセージプロデューサーコンシューマー を使用して接続されたピア間で転送されます。プロデューサーとコンシューマーは セッション 上で確立されます。セッションは接続上で確立されます。接続は 接続ファクトリー によって作成されます。

送信ピアは、メッセージ送信用のプロデューサーを作成します。プロデューサーには、リモートピアでターゲットキューまたはトピックを識別する 宛先 があります。受信ピアは、メッセージ受信用のコンシューマーを作成します。プロデューサーと同様に、コンシューマーにはリモートピアでソースキューまたはトピックを識別する宛先があります。

宛先は、キュー または トピック のいずれかです。JMS では、キューとトピックはメッセージを保持する名前付きブローカーエンティティーのクライアント側表現です。

キューは、ポイントツーポイントセマンティクスを実装します。各メッセージは 1 つのコンシューマーによってのみ認識され、メッセージは読み取り後にキューから削除されます。トピックはパブリッシュ/サブスクライブセマンティクスを実装します。各メッセージは複数のコンシューマーによって認識され、メッセージは読み取り後も他のコンシューマーで利用できるままになります。

詳細は、JMS tutorial を参照してください。

1.5. 本書の表記慣例

sudo コマンド

本書では、root 権限を必要とするすべてのコマンドに対して sudo が使用されています。すべての変更がシステム全体に影響する可能性があるため、sudo を使用する場合は注意が必要です。sudo の詳細は、sudo コマンドの使用を参照してください。

ファイルパス

本書では、すべてのファイルパスが Linux、UNIX、および同様のオペレーティングシステムで有効です (例: /home/andrea)。Microsoft Windows では、同等の Windows パスを使用する必要があります (例: C:\Users\andrea)。

変数テキスト

本書では、変数を含むコードブロックが紹介されていますが、これは、お客様の環境に固有の値に置き換える必要があります。可変テキストは矢印の中括弧で囲まれ、斜体の等幅フォントとしてスタイル設定されます。たとえば、以下のコマンドでは <project-dir> は実際の環境の値に置き換えます。

$ cd <project-dir>

第2章 インストール

本章では、環境に AMQ Core Protocol JMS をインストールする手順を説明します。

2.1. 前提条件

  • AMQ リリースファイルおよびリポジトリーにアクセスするには、サブスクリプション が必要です。
  • AMQ Core Protocol JMS でプログラムを構築するには、Apache Maven をインストールする必要があります。
  • AMQ Core Protocol JMS を使用するには、Java をインストールする必要があります。

2.2. Red Hat Maven リポジトリーの使用

Red Hat Maven リポジトリーからクライアントライブラリーをダウンロードするように Maven 環境を設定します。

手順

  1. Red Hat リポジトリーを Maven 設定または POM ファイルに追加します。設定ファイルの例は、「オンラインリポジトリーの使用」 を参照してください。

    <repository>
      <id>red-hat-ga</id>
      <url>https://maven.repository.redhat.com/ga</url>
    </repository>
  2. ライブラリーの依存関係を POM ファイルに追加します。

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>artemis-jms-client</artifactId>
      <version>2.11.0.redhat-00005</version>
    </dependency>

これで、Maven プロジェクトでクライアントを使用できるようになります。

2.3. ローカル Maven リポジトリーのインストール

オンラインリポジトリーの代わりに、AMQ Core Protocol JMS をファイルベースの Maven リポジトリーとしてローカルファイルシステムにインストールできます。

手順

  1. サブスクリプションを使用してAMQ Broker 7.7.0 Maven リポジトリー の .zip ファイルをダウンロードします。
  2. 選択したディレクトリーにファイルの内容を抽出します。

    Linux または UNIX では、unzip コマンドを使用してファイルの内容を抽出します。

    $ unzip amq-broker-7.7.0-maven-repository.zip

    Windows では、.zip ファイルを右クリックして、Extract All を選択します。

  3. 抽出されたインストールディレクトリー内の maven-repository ディレクトリーにあるリポジトリーを使用するように Maven を設定します。詳細は、「ローカルリポジトリーの使用」 を参照してください。

2.4. サンプルのインストール

手順

  1. サブスクリプションを使用して AMQ Broker 7.7.0 .zip ファイルをダウンロードします。
  2. 選択したディレクトリーにファイルの内容を抽出します。

    Linux または UNIX では、unzip コマンドを使用してファイルの内容を抽出します。

    $ unzip amq-broker-7.7.0.zip

    Windows では、.zip ファイルを右クリックして、Extract All を選択します。

    .zip ファイルの内容を抽出すると、amq-broker-7.7.0 という名前のディレクトリーが作成されます。これはインストールの最上位ディレクトリーであり、本書では <install-dir> と呼びます。

第3章 スタートガイド

本章では、環境を設定して簡単なメッセージングプログラムを実行する手順を説明します。

3.1. 前提条件

3.2. 最初のサンプルの実行

この例では、exampleQueue という名前のキューにコンシューマーおよびプロデューサーを作成します。テキストメッセージを送信してから受信し、受信したメッセージをコンソールに出力します。

手順

  1. <install-dir>/examples/features/standard/queue ディレクトリーで以下のコマンドを実行し、Maven を使用してサンプルを構築します。

    $ mvn clean package dependency:copy-dependencies -DincludeScope=runtime -DskipTests

    dependency:copy-dependencies を追加すると、依存関係が target/dependency ディレクトリーにコピーされます。

  2. java コマンドを使用して例を実行します。

    Linux または UNIX の場合:

    $ java -cp "target/classes:target/dependency/*" org.apache.activemq.artemis.jms.example.QueueExample

    Windows の場合:

    > java -cp "target\classes;target\dependency\*" org.apache.activemq.artemis.jms.example.QueueExample

たとえば、Linux で実行すると、以下のような出力になります。

$ java -cp "target/classes:target/dependency/*" org.apache.activemq.artemis.jms.example.QueueExample
Sent message: This is a text message
Received message: This is a text message

この例のソースコードは <install-dir>/examples/features/standard/queue/src ディレクトリーにあります。追加の例は、<install-dir>/examples/features/standard ディレクトリーにあります。

第4章 設定

本章では、AMQ Core Protocol JMS 実装を JMS アプリケーションにバインドし、設定オプションを設定するプロセスについて説明します。

JMS は Java Naming Directory Interface (JNDI) を使用して、API 実装およびその他のリソースを登録し、検索します。これにより、特定の実装に固有のコードを作成せずに JMS API にコードを作成できます。

設定オプションは、接続 URI でクエリーパラメーターとして公開されます。一部のオプションは、ConnectionFactory 実装オブジェクトの対応する set および get メソッドとしても公開されます。

4.1. 初期コンテキストファクトリーの設定

JMS アプリケーションは InitialContextFactory から取得した JNDI InitialContext オブジェクトを使用して、接続ファクトリーなどの JMS オブジェクトを検索します。AMQ Core Protocol JMS は、org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory クラスで InitialContextFactory の実装を提供します。

InitialContextFactory の実装は、InitialContext オブジェクトがインスタンス化されると検出されます。

javax.naming.Context context = new javax.naming.InitialContext();

実装を見つけるには、お使いの環境で JNDI を設定する必要があります。これを実現するには、jndi.properties ファイルを使用する方法とシステムプロパティーを使用する方法の 2 つの主な方法があります。

jndi.properties ファイルの使用

jndi.properties という名前のファイルを作成し、Java クラスパスに配置します。java.naming.factory.initial キーでプロパティーを追加します。

例: jndi.properties ファイルを使用した JNDI 初期コンテキストファクトリーの設定

java.naming.factory.initial = org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory

Maven ベースのプロジェクトでは、jndi.properties ファイルは <project-dir>/src/main/resources ディレクトリーに配置されます。

システムプロパティーの使用

java.naming.factory.initial システムプロパティーを設定します。

例: システムプロパティーを使用した JNDI 初期コンテキストファクトリーの設定

$ java -Djava.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory ...

4.2. 接続ファクトリーの設定

JMS 接続ファクトリーは、接続を作成するためのエントリーポイントです。これは、アプリケーション固有の設定をエンコードする接続 URI を使用します。

ファクトリー名と接続 URI を設定するには、以下の形式でプロパティーを作成します。この設定は、jndi.properties ファイルに保存するか、対応するシステムプロパティーを設定できます。

接続ファクトリーの JNDI プロパティー形式

connectionFactory.<factory-name> = <connection-uri>

たとえば、以下のように app1 という名前のファクトリーを設定します。

例: jndi.properties ファイルでの接続ファクトリーの設定

connectionFactory.app1 = tcp://example.net:61616?clientID=backend

その後、JNDI コンテキストを使用して、app1 の名前を使用して設定済みの接続ファクトリーを検索できます。

ConnectionFactory factory = (ConnectionFactory) context.lookup("app1");

4.3. 接続 URI

接続ファクトリーは、次の形式の接続 URI を使用して設定されます。

接続 URI 形式

tcp://<host>:<port>[?<option>=<value>[&<option>=<value>...]]

たとえば、以下はポート 61616 でホスト example.net に接続する接続 URI で、クライアント ID を backend に設定します。

例: 接続 URI

tcp://example.net:61616?clientID=backend

フェイルオーバー URI は次の形式を取ります。

フェイルオーバー URI 形式

(<connection-uri>[,<connection-uri>])[?<option>=<value>[&<option>=<value>...]]

使用可能な接続オプションについては、次のセクションで説明します。

4.4. JMS オプション

user
接続の認証に使用されるユーザー名。
password
接続の認証に使用されるパスワード。
clientID
接続に適用されるクライアント ID。
groupID
生成されたすべてのメッセージに適用されるグループ ID。
autoGroup
有効な場合は、ランダムなグループ ID を生成し、生成されたすべてのメッセージに適用します。
cacheDestinations
有効にすると、宛先検索をキャッシュします。これはデフォルトでは無効にされます。
blockOnDurableSend
有効な場合、非トランザクションの永続メッセージを送信すると、リモートピアが受信を確認するまでブロックします。これは、デフォルトで有効になっています。
blockOnNonDurableSend
有効な場合、非トランザクションの非永続メッセージを送信すると、リモートピアが受信を確認するまでブロックします。これはデフォルトでは無効にされます。
blockOnAcknowledge
有効な場合、トランザクション以外の受信したメッセージを承認すると、リモートピアが確認を確認するまでブロックします。これはデフォルトでは無効にされます。
dupsOkBatchSize
DUPS_OK_ACKNOWLEDGE 確認モードを使用する場合、確認バッチのバイト単位のサイズ。デフォルトは 1048576 (1 MiB) です。
preAcknowledge
有効にすると、メッセージが送信されると同時に、配信が完了する前にメッセージを承認します。最大 1 回の配信これはデフォルトでは無効にされます。

4.5. TCP オプション

tcpNoDelay
有効な場合、TCP 送信の遅延やバッファーを行いません。これは、デフォルトで有効になっています。
tcpSendBufferSize
送信バッファーサイズ (バイト単位)。デフォルトは 32768 (32 KiB) です。
tcpReceiveBufferSize
受信バッファーサイズ (バイト単位)。デフォルトは 32768 (32 KiB) です。
writeBufferLowWaterMark
この制限 (バイト単位) では、書き込みバッファーが書き込み可能になります。デフォルトは 32768 (32 KiB) です。
writeBufferHighWaterMark
この制限 (バイト単位) では、書き込みバッファーが書き込み不可になります。デフォルトは 131072 (128 KiB) です。

4.6. SSL/TLS オプション

sslEnabled
有効にすると、SSL/TLS を使用して接続を認証および暗号化します。これはデフォルトでは無効にされます。
keyStorePath
SSL/TLS キーストアへのパス。キーストアは相互 SSL/TLS 認証に必要です。設定しないと、javax.net.ssl.keyStore システムプロパティーの値が使用されます。
keyStorePassword
SSL/TLS キーストアのパスワード。設定しないと、javax.net.ssl.keyStorePassword システムプロパティーの値が使用されます。
trustStorePath
SSL/TLS トラストストアへのパス。設定しないと、javax.net.ssl.trustStore システムプロパティーの値が使用されます。
trustStorePassword
SSL/TLS トラストストアのパスワード。設定しないと、javax.net.ssl.trustStorePassword システムプロパティーの値が使用されます。
trustAll
有効にすると、設定されたトラストストアに関係なく、提供されたサーバー証明書を暗黙的に信頼します。これはデフォルトでは無効にされます。
verifyHost
有効な場合は、接続ホスト名が、提供されたサーバー証明書と一致することを確認します。これはデフォルトでは無効にされます。
enabledCipherSuites
有効にする暗号スイートのコンマ区切りリスト。未設定の場合は、context-default 暗号が使用されます。
enabledProtocols
有効にする SSL/TLS プロトコルのコンマ区切りリスト。未設定の場合は、JVM デフォルトプロトコルが使用されます。

4.7. コアプロトコルオプション

clientFailureCheckPeriod
期限切れの接続を定期的にチェックする間隔 (ミリ秒単位)。デフォルトは 30000 (30 秒) です。-1 はチェックを無効にします。
connectionTTL
サーバーが ping パケットを送信しない場合に接続に失敗するまでの時間 (ミリ秒単位)。デフォルトは 60000 (1 分) です。-1 はタイムアウトを無効にします。
consumerWindowSize
コンシューマーごとのメッセージの事前フェッチバッファーのサイズ (バイト単位)。デフォルトは 1048576 (1 MiB) で、1 は制限がないことを意味します。0 は事前フェッチを無効にします。
consumerMaxRate
1 秒あたりに消費するメッセージの最大数。デフォルトは -1 で、無制限を意味します。
producerWindowSize
より多くのメッセージを生成するためにクレジットに要求されたサイズ (バイト単位)。これにより、1 度にインフライトデータの合計量が制限されます。デフォルトは 1048576 (1 MiB) で、1 は制限がないことを意味します。
producerMaxRate
1 秒あたりに生成するメッセージの最大数。デフォルトは -1 で、無制限を意味します。
transactionBatchSize
トランザクションでメッセージを受信するとき、確認バッチのバイト単位のサイズ。デフォルトは 1048576 (1 MiB) です。

4.8. フェイルオーバーオプション

reconnnectAttempts
接続が失敗したと報告するまでに許可される再接続試行回数。デフォルトは -1 で、無制限を意味します。
initialConnectAttempts
最初に接続に成功する前に、クライアントがブローカートポロジーを検出するまでに許可される再接続試行回数。デフォルトは 0 で、試行は 1 回のみが許可されます。
failoverOnInitialConnection
有効な場合は、最初の接続に失敗した場合にバックアップサーバーへの接続を試みます。これはデフォルトでは無効にされます。

4.9. 検出オプション

ha
有効な場合は、HA ブローカーのトポロジーの変更を追跡します。URI からのホストおよびポートは、初期接続にのみ使用されます。クライアントは、最初の接続後に現在のフェイルオーバーエンドポイントとトポロジーの変更から生じた更新を受け取ります。これはデフォルトでは無効にされます。
useTopologyForLoadBalancing
有効な場合は、接続負荷分散にクラスタートポロジーを使用します。これは、デフォルトで有効になっています。

4.10. 大型メッセージのオプション

クライアントは、プロパティー minLargeMessageSize の値を設定することで、大きなメッセージサポートを有効にすることができます。minLargeMessageSize を超えるメッセージは、大きなメッセージとみなされます。

minLargeMessageSize
メッセージが大きなメッセージとして扱われる最小サイズ (バイト単位)。デフォルトは 102400 (100 KiB) 日です。
compressLargeMessages

有効な場合は、minLargeMessageSize で定義されているように大きなメッセージを圧縮します。これはデフォルトでは無効にされます。

注記

大きなメッセージの圧縮サイズが minLargeMessageSize の値より小さい場合、メッセージは通常のメッセージとして送信されます。そのため、ブローカーの大型メッセージデータディレクトリーには書き込まれません。

4.11. JNDI リソースの設定

4.11.1. キューおよびトピック名の設定

JMS は、JNDI を使用してデプロイメント固有のキューとトピックリソースを検索するオプションを提供します。

JNDI でキューおよびトピック名を設定するには、以下の形式でプロパティーを作成します。この設定を jndi.properties ファイルに置くか、対応するシステムプロパティーを設定します。

キューおよびトピックの JNDI プロパティー形式

queue.<queue-lookup-name> = <queue-name>
topic.<topic-lookup-name> = <topic-name>

たとえば、以下のプロパティーは、2 つのデプロイメント固有のリソースの名前、jobs および notifications を定義します。

例: jndi.properties ファイルでのキューおよびトピック名の設定

queue.jobs = app1/work-items
topic.notifications = app1/updates

その後、JNDI 名でリソースを検索できます。

Queue queue = (Queue) context.lookup("jobs");
Topic topic = (Topic) context.lookup("notifications");

4.11.2. プログラムによる JNDI プロパティーの設定

jndi.properties ファイルまたはシステムプロパティーを使用して JNDI を設定する代わりに、JNDI 初期コンテキスト API を使用してプログラムでプロパティーを定義できます。

例: プログラムでの JNDI プロパティーの設定

Hashtable<Object, Object> env = new Hashtable<>();

env.put("java.naming.factory.initial", "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
env.put("connectionFactory.app1", "tcp://example.net:61616?clientID=backend");
env.put("queue.jobs", "app1/work-items");
env.put("topic.notifications", "app1/updates");

InitialContext context = new InitialContext(env);

第5章 ネットワーク接続

5.1. 自動フェイルオーバー

クライアントは、接続障害が発生した場合にスレーブブローカーに再接続できるように、すべてのマスターおよびスレーブブローカーに関する情報を受け取ることができます。その後、スレーブブローカーは、フェイルオーバー前に各接続に存在したセッションおよびコンシューマーを自動的に再作成します。この機能により、アプリケーションで手動で再接続ロジックをコーディングする必要がなくなります。

セッションがスレーブで再作成されると、送信または確認済みのメッセージを認識しません。フェイルオーバー時の in-flight の送信と確認応答も失われる可能性があります。ただし、透過的なフェイルオーバーがなくても、トランザクションの重複検出と再試行の組み合わせを使用することで、障害が発生した場合でも、一度だけ の配信を保証するのは簡単です。

クライアントは、設定可能な期間内にブローカーからパケットを受信しないと接続の障害を検出します。詳細は、「デッド接続の検出」 を参照してください。

マスターおよびスレーブについての情報を受信するようにクライアントを設定する方法は複数あります。1 つのオプションとして、特定のブローカーに接続し、クラスター内の他のブローカーに関する情報を受信するようにクライアントを設定します。詳細は、「静的検出の設定」 を参照してください。ただし、最も一般的な方法は、ブローカー検出 を使用することです。ブローカー検出の設定方法の詳細は、「動的検出の設定」 を参照してください。

また、以下の例のように、ブローカーへの接続に使用される URI のクエリー文字列にパラメーターを追加して、クライアントを設定することもできます。

connectionFactory.ConnectionFactory=tcp://localhost:61616?ha=true&reconnectAttempts=3

手順

クエリー文字列を使用してフェイルオーバーを行うようにクライアントを設定するには、URI の以下のコンポーネントが適切に設定されていることを確認します。

  1. URI の host:port の部分は、バックアップで適切に設定されたマスターブローカーを参照する必要があります。このホストおよびポートは最初の接続にのみ使用されます。host:port の値は、ライブサーバーとバックアップサーバー間の実際の接続フェイルオーバーとは関係ありません。上記の例では、localhost:61616host:port に使用されます。
  2. (オプション) 複数のブローカーを可能な初期接続として使用するには、以下の例のように host:port エントリーをグループ化します。

    connectionFactory.ConnectionFactory=(tcp://host1:port,tcp://host2:port)?ha=true&reconnectAttempts=3
  3. 名前/値のペア ha=true をクエリー文字列の一部として追加し、クライアントがクラスター内の各マスターおよびスレーブブローカーに関する情報を受け取るようにします。
  4. 名前と値のペア reconnectAttempts=n を含めます。n は、0 より大きい整数です。このパラメーターは、クライアントがブローカーへの再接続を試行する回数を設定します。
注記

フェイルオーバーは ha=truereconnectAttempts0 よりも大きい場合にのみ発生します。また、他のブローカーに関する情報を取得するために、クライアントはマスターブローカーへ最初の接続を確立する必要があります。最初の接続に失敗すると、クライアントは再試行して接続を確立できます。詳細は、「初期接続時のフェイルオーバー」 を参照してください。

5.1.1. 初期接続時のフェイルオーバー

クライアントは HA クラスターへの最初の接続が確立されるまですべてのブローカーに関する情報を受け取らないため、クライアントが接続 URI に含まれるブローカーにのみ接続できる期間があります。したがって、この最初の接続で障害が発生した場合、クライアントは他のマスターブローカーにフェイルオーバーできず、最初の接続の再確立のみを試すことができます。クライアントには、再接続試行回数を設定できます。試行がその回数行われると、例外が出力されます。

再接続試行回数の設定

手順

以下の例は、AMQ Core Protocol JMS クライアントを使用して再接続試行回数を 3 に設定する方法を示しています。デフォルト値は 0 で、つまり、1 回のみ試行します。

  • ServerLocator.setInitialConnectAttempts() に値を渡して、再接続試行の数を設定します。

    ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
    cf.setInitialConnectAttempts(3);
再接続試行回数のグローバル設定

または、ブローカーの設定内で再接続試行回数のグローバルな値を適用できます。最大値はすべてのクライアント接続に適用されます。

手順

  • 以下の例のように、<broker-instance-dir>/etc/broker.xml を編集して initial-connect-attempts 設定要素を追加し、time-to-live の値を指定します。

    <configuration>
     <core>
      ...
      <initial-connect-attempts>3</initial-connect-attempts> 1
      ...
     </core>
    </configuration>
    1
    ブローカーに接続するすべてのクライアントは、最大 3 回の再接続の試行が許可されます。デフォルトは -1 で、クライアントは無制限の試行を許可します。

5.1.2. フェイルオーバー中のブロック呼び出しの処理

フェイルオーバーが発生し、クライアントが実行を継続するためにブローカーからの応答を待機している場合、新たに作成されたセッションには、進行中の呼び出しに関する情報がありません。初期呼び出しは、それ以外では、決して受け取ることのない応答を待機してハングする可能性があります。これを防ぐため、ブローカーは例外を出力することで、フェイルオーバー時に進行中のブロッキング呼び出しのブロックを解除するように設計されています。クライアントコードはこれらの例外をキャッチし、必要に応じてすべての操作を再試行できます。

AMQ Core Protocol JMS クライアントを使用する場合、ブロックされていないメソッドが commit() または prepare() の呼び出しである場合、トランザクションは自動的にロールバックされ、ブローカーによって例外が発生します。

5.1.3. トランザクションによるフェイルオーバーの処理

AMQ Core Protocol JMS クライアントを使用すると、セッションがトランザクションであり、現在のトランザクションでメッセージがすでに送信または確認応答されている場合、ブローカーはフェイルオーバー中にメッセージまたはその確認応答が失われたかどうかがわかりません。そのため、トランザクションはロールバック用にのみマークされます。その後、コミットしようとすると javax.jms.TransactionRolledBackException が出力されます。

警告

このルールに関する注意点は、XA を使用する場合です。2 フェーズコミットが使用され、prepare() がすでに呼び出されている場合、ロールバックによって HeuristicMixedException が発生する可能性があります。このため、コミットによって XAException.XA_RETRY 例外が発生し、トランザクションマネージャーは後でコミットを再試行することを通知します。元のコミットが発生していない場合、それはまだ存在し、コミットされます。存在しない場合、コミットされたと見なされますが、トランザクションマネージャーが警告を記録する可能性があります。この例外の副次的な影響は、非永続的なメッセージが失われることです。このような損失を回避するために、XA を使用する際には常に永続メッセージを使用してください。prepare() が呼び出される前にサーバーにフラッシュされるため、これは確認応答の問題ではありません。

AMQ Core Protocol JMS クライアントコードは、例外をキャッチし、必要なクライアント側のロールバックを実行する必要があります。ただし、セッションはロールバックされているため、セッションをロールバックする必要はありません。ユーザーは、同じセッションに対してトランザクション操作を再試行できます。

コミット呼び出しの実行時にフェイルオーバーが発生すると、ブローカーは呼び出しをブロックしず、AMQ Core Protocol JMS クライアントが応答を無期限に待機しないようにします。この場合、障害が発生する前にトランザクションのコミットがライブサーバーで実際に処理されたかどうかをクライアントが判断するのは容易ではありません。

これを修正するには、AMQ Core Protocol JMS クライアントはトランザクションで重複検出を有効にし、呼び出しがブロック解除された後にトランザクション操作を再試行します。フェイルオーバー前にトランザクションが正常にマスターブローカーでコミットされた場合、重複検出により、再試行時にトランザクションに存在する永続メッセージがブローカー側で無視されるようにします。これにより、メッセージが複数回送信されなくなります。

セッションがトランザクションではない場合は、フェイルオーバー時にメッセージまたは確認応答が失われる可能性があります。トランザクション以外のセッションに 一度だけ 配信する保証を提供する場合は、重複検出を有効にし、ブロック解除例外をキャッチします。

5.1.4. 接続失敗の通知

JMS は、接続障害の非同期に通知するための標準メカニズムである java.jms.ExceptionListener を提供します。

ExceptionListener または SessionFailureListener インスタンスは、接続が正常にフェイルオーバー、再接続、または再割り当てされたかどうかに関係なく、接続障害が発生した場合にブローカーによって常に呼び出されます。SessionFailureListenerconnectionFailed で渡された failedOver フラグを調べると、再接続または再割り当てが発生したかどうかを確認できます。または、javax.jms.JMSException のエラーコードを確認できます。これは以下のいずれかになります。

表5.1 JMSException エラーコード

エラーコード説明

FAILOVER

フェイルオーバーが発生し、ブローカーが正常に再割り当てまたは再接続しました。

DISCONNECT

フェイルオーバーは発生せず、切断されています。

5.2. アプリケーションレベルのフェイルオーバー

場合によっては、自動クライアントのフェイルオーバーは望ましくないかもしれませんが、障害ハンドラーで独自の再接続ロジックをコーディングしたい場合もあります。フェイルオーバーはユーザーアプリケーションレベルで処理されるため、これは、アプリケーションレベル のフェイルオーバーとして定義します。

JMS を使用する場合にアプリケーションレベルのフェイルオーバーを実装するには、JMS 接続で ExceptionListener クラスを設定します。ExceptionListener は、接続障害が検出された場合に JBoss EAP メッセージングによって呼び出されます。ExceptionListener では、古い JMS 接続を閉じる必要があります。JNDI から新しい接続ファクトリーインスタンスを検索し、新しい接続を作成することも可能です。

5.3. デッド接続の検出

ブローカーからデータを受信する限り、クライアントは接続がアライブ状態であると判断します。client-failure-check-period プロパティーの値を指定して、接続の失敗を確認するようにクライアントを設定します。ネットワーク接続のデフォルトのチェック期間は 30000 ミリ秒 (30 秒) で、仮想マシン間の接続のデフォルト値は -1 です。これは、データが受信されなかった場合、クライアントがその側から接続を失敗させないことを意味します。

通常、チェック期間はブローカーの接続の Time-to-live に使用される値よりもはるかに低い値に設定し、一時的な障害が発生した場合にクライアントが再接続できるようにします。

次の例は、チェック期間を 10000 ミリ秒または 10 秒に設定する方法を示しています。

手順

  • デッド接続を検出するためのチェック期間を設定します。

    • JNDI を使用している場合は、以下のように JNDI コンテキスト環境 (jndi.properties) 内のチェック期間を設定します。

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?clientFailureCheckPeriod=10000
    • JNDI を使用していない場合は、値を ActiveMQConnectionFactory.setClientFailureCheckPeriod() に渡してチェック期間を直接設定します。

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setClientFailureCheckPeriod(10000);

5.4. time-to-live の設定

デフォルトでは、クライアントは独自の接続に TTL (time-to-live) を設定できます。以下の例は、TTL を設定する方法を示しています。

手順

  • クライアント接続に time-to-live を設定します。

    • JNDI を使用して接続ファクトリーをインスタンス化する場合は、connectionTtl パラメーターを使用して xml 設定に指定できます。

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?connectionTtl=30000
    • JNDI を使用していない場合は、接続 TTL は ActiveMQConnectionFactory インスタンスの ConnectionTTL 属性によって定義されます。

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConnectionTTL(30000);

5.5. 接続を閉じる

クライアントアプリケーションは、デッド接続が発生しないように、終了する前に制御された方法でリソースを閉じる必要があります。Java では、finally ブロック内で接続を閉じることが推奨されます。

Connection jmsConnection = null;
try {
   ConnectionFactory jmsConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(...);
   jmsConnection = jmsConnectionFactory.createConnection();
   ...use the connection...
}
finally {
   if (jmsConnection != null) {
      jmsConnection.close();
   }
}

5.6. 動的検出の設定

接続の確立を試みる際に、AMQ Core Protocol JMS を設定してブローカーのリストを検出できます。

クライアント上で JNDI を使用して JMS 接続ファクトリーインスタンスを検索する場合は、これらのパラメーターを JNDI コンテキスト環境で指定できます。通常、パラメーターは jndi.properties という名前のファイルで定義されます。接続ファクトリーの URI のホストおよび部分は、ブローカーの broker.xml 設定ファイル内の対応する broadcast-groupgroup-addressgroup-port と一致する必要があります。以下は、ブローカーの検出グループに接続するように設定されている jndi.properties ファイルの例です。

java.naming.factory.initial = ActiveMQInitialContextFactory
connectionFactory.myConnectionFactory=udp://231.7.7.7:9876

接続ファクトリーがクライアントアプリケーションによって JNDI からダウンロードされ、JMS 接続が作成される場合、これらの接続は、ブローカーの検出グループ設定に指定されたマルチキャストアドレスでリッスンすることで、ディスカバリーグループが維持するサーバーのリスト全体で負荷分散されます。

JNDI を使用する代わりに、JMS 接続ファクトリーの作成時に、ディスカバリーグループパラメーターを直接 Java コードに指定できます。以下のコードは、その方法の例になります。

final String groupAddress = "231.7.7.7";
final int groupPort = 9876;

DiscoveryGroupConfiguration discoveryGroupConfiguration = new DiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory udpBroadcastEndpointFactory = new UDPBroadcastEndpointFactory();
udpBroadcastEndpointFactory.setGroupAddress(groupAddress).setGroupPort(groupPort);
discoveryGroupConfiguration.setBroadcastEndpointFactory(udpBroadcastEndpointFactory);

ConnectionFactory jmsConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA
    (discoveryGroupConfiguration, JMSFactoryType.CF);

Connection jmsConnection1 = jmsConnectionFactory.createConnection();
Connection jmsConnection2 = jmsConnectionFactory.createConnection();

setter メソッド setRefreshTimeout() を使用して、更新タイムアウトを DiscoveryGroupConfiguration に直接設定できます。デフォルト値は 10000 ミリ秒です。

最初の使用時では、接続ファクトリーは、最初の接続を作成する前に、作成してからこの期間待機します。デフォルトの待機時間は 10000 ミリ秒ですが、新しい値を DiscoveryGroupConfiguration.setDis.coveryInitialWaitTimeout() に渡すことで変更できます。

5.7. 静的検出の設定

使用しているネットワークで UDP を使用できないことがあります。この場合は、使用可能なサーバーの初期リストで接続を設定できます。リストは、常に利用できることがわかっている 1 つのブローカーだけになるか、少なくとも 1 つのブローカーが利用可能であるブローカーのリストにすることができます。

これは、すべてのサーバーがホストされる場所を、認識しなければならないという意味ではありません。信頼できるサーバーを使用して接続するように、これらのサーバーを設定できます。接続後、接続の詳細はサーバーからクライアントに伝播されます。

クライアント上で JNDI を使用して JMS 接続ファクトリーインスタンスを検索する場合は、これらのパラメーターを JNDI コンテキスト環境で指定できます。通常、パラメーターは jndi.properties という名前のファイルで定義されます。以下は、動的検出を使用する代わりに、ブローカーの静的リストを提供する jndi.properties ファイルの例です。

java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.myConnectionFactory=(tcp://myhost:61616,tcp://myhost2:61616)

上記の接続ファクトリーがクライアントによって使用される場合、接続は括弧 () 内で定義されるブローカーのリスト全体で負荷分散されます。

JMS 接続ファクトリーを直接インスタンス化する場合は、以下の例のように JMS 接続ファクトリーの作成時にコネクターリストを明示的に指定できます。

HashMap<String, Object> map = new HashMap<String, Object>();
map.put("host", "myhost");
map.put("port", "61616");
TransportConfiguration broker1 = new TransportConfiguration
    (NettyConnectorFactory.class.getName(), map);

HashMap<String, Object> map2 = new HashMap<String, Object>();
map2.put("host", "myhost2");
map2.put("port", "61617");
TransportConfiguration broker2 = new TransportConfiguration
    (NettyConnectorFactory.class.getName(), map2);

ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithHA
    (JMSFactoryType.CF, broker1, broker2);

第6章 メッセージ配信

6.1. ストリーミングされた大きなメッセージへの書き込み

大きなメッセージに書き込むには、BytesMessage.writeBytes() メソッドを使用します。以下の例では、ファイルからバイトを読み取り、メッセージに書き込みます。

例: ストリーミングされた大きなメッセージへの書き込み

BytesMessage message = session.createBytesMessage();
File inputFile = new File(inputFilePath);
InputStream inputStream = new FileInputStream(inputFile);

int numRead;
byte[] buffer = new byte[1024];

while ((numRead = inputStream.read(buffer, 0, buffer.length)) != -1) {
    message.writeBytes(buffer, 0, numRead);
}

6.2. ストリームされた大きなメッセージからの読み取り

大きなメッセージから読み取るには、BytesMessage.readBytes() メソッドを使用します。以下の例では、メッセージからバイトを読み取り、ファイルに書き込みます。

例: ストリームされた大きなメッセージからの読み取り

BytesMessage message = (BytesMessage) consumer.receive();
File outputFile = new File(outputFilePath);
OutputStream outputStream = new FileOutputStream(outputFile);

int numRead;
byte buffer[] = new byte[1024];

for (int pos = 0; pos < message.getBodyLength(); pos += buffer.length) {
    numRead = message.readBytes(buffer);
    outputStream.write(buffer, 0, numRead);
}

付録A サブスクリプションの使用

AMQ は、ソフトウェアサブスクリプションから提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。

A.1. アカウントへのアクセス

手順

  1. access.redhat.com に移動します。
  2. アカウントがない場合は、作成します。
  3. アカウントにログインします。

A.2. サブスクリプションのアクティベート

手順

  1. access.redhat.com に移動します。
  2. サブスクリプション に移動します。
  3. Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。

A.3. リリースファイルのダウンロード

.zip、.tar.gz およびその他のリリースファイルにアクセスするには、カスタマーポータルを使用してダウンロードする関連ファイルを検索します。RPM パッケージまたは Red Hat Maven リポジトリーを使用している場合は、この手順は必要ありません。

手順

  1. ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
  2. JBOSS INTEGRATION AND AUTOMATION カテゴリーの Red Hat AMQ エントリーを見つけます。
  3. 必要な AMQ 製品を選択します。Software Downloads ページが開きます。
  4. コンポーネントの Download リンクをクリックします。

A.4. パッケージ用のシステムの登録

RPM パッケージを Red Hat Enterprise Linux にインストールするには、システムが登録されている必要があります。ダウンロードしたリリースファイルを使用している場合は、この手順は必要ありません。

手順

  1. access.redhat.com に移動します。
  2. Registration Assistant に移動します。
  3. ご使用の OS バージョンを選択し、次のページに進みます。
  4. システムの端末に一覧表示されたコマンドを使用して、登録を完了します。

詳細は、How to Register and Subscribe a System to the Red Hat Customer Portal を参照してください。

付録B Red Hat Maven リポジトリーの追加

このセクションでは、Red Hat が提供する Maven リポジトリーをソフトウェアで使用する方法を説明します。

B.1. オンラインリポジトリーの使用

Red Hat は、Maven ベースのプロジェクトで使用する中央の Maven リポジトリーを維持しています。詳細は、リポジトリーのウェルカムページ を参照してください。

Red Hat リポジトリーを使用するように Maven を設定する方法は 2 つあります。

Maven 設定へのリポジトリーの追加

この設定方法は、POM ファイルがリポジトリー設定をオーバーライドせず、含まれているプロファイルが有効になっている限り、ユーザーが所有するすべての Maven プロジェクトに適用されます。

手順

  1. Maven の settings.xml ファイルを見つけます。これは通常、ユーザーのホームディレクトリーの .m2 ディレクトリー内にあります。ファイルが存在しない場合は、テキストエディターを使用して作成します。

    Linux または UNIX の場合:

    /home/<username>/.m2/settings.xml

    Windows の場合:

    C:\Users\<username>\.m2\settings.xml
  2. 次の例のように、Red Hat リポジトリーを含む新しいプロファイルを settings.xml ファイルの profiles 要素に追加します。

    例: Red Hat リポジトリーを含む Maven settings.xml ファイル

    <settings>
      <profiles>
        <profile>
          <id>red-hat</id>
          <repositories>
            <repository>
              <id>red-hat-ga</id>
              <url>https://maven.repository.redhat.com/ga</url>
            </repository>
          </repositories>
          <pluginRepositories>
            <pluginRepository>
              <id>red-hat-ga</id>
              <url>https://maven.repository.redhat.com/ga</url>
              <releases>
                <enabled>true</enabled>
              </releases>
              <snapshots>
                <enabled>false</enabled>
              </snapshots>
            </pluginRepository>
          </pluginRepositories>
        </profile>
      </profiles>
      <activeProfiles>
        <activeProfile>red-hat</activeProfile>
      </activeProfiles>
    </settings>

Maven 設定の詳細は、Maven 設定リファレンス を参照してください。

POM ファイルへのリポジトリーの追加

プロジェクトで直接リポジトリーを設定するには、次の例のように、POM ファイルの repositories 要素に新しいエントリーを追加します。

例: Red Hat リポジトリーを含む Maven pom.xml ファイル

<project>
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>example-app</artifactId>
  <version>1.0.0</version>

  <repositories>
    <repository>
      <id>red-hat-ga</id>
      <url>https://maven.repository.redhat.com/ga</url>
    </repository>
  </repositories>
</project>

POM ファイル設定の詳細は、Maven POM リファレンス を参照してください。

B.2. ローカルリポジトリーの使用

Red Hat は、そのコンポーネントの一部にファイルベースの Maven リポジトリーを提供します。これらは、ローカルファイルシステムに抽出できるダウンロード可能なアーカイブとして提供されます。

ローカルに抽出されたリポジトリーを使用するように Maven を設定するには、Maven 設定または POM ファイルに次の XML を適用します。

<repository>
  <id>red-hat-local</id>
  <url>${repository-url}</url>
</repository>

${repository-url} は、抽出されたリポジトリーのローカルファイルシステムパスを含むファイル URL である必要があります。

表B.1 ローカル Maven リポジトリーの URL の例

オペレーティングシステムファイルシステムパスURL

Linux または UNIX

/home/alice/maven-repository

file:/home/alice/maven-repository

Windows

C:\repos\red-hat

file:C:\repos\red-hat

付録C 例で AMQ ブローカーの使用

AMQ Core Protocol JMS の例では、exampleQueue という名前のキューを持つ実行中のメッセージブローカーが必要です。以下の手順に従って、ブローカーをインストールして起動し、キューを定義します。

C.1. ブローカーのインストール

Getting Started with AMQ Broker の手順に従って、ブローカーをインストール して、ブローカーインスタンスを作成 します。匿名アクセスを有効にします。

以下の手順では、ブローカーインスタンスの場所を <broker-instance-dir> と呼びます。

C.2. ブローカーの起動

手順

  1. artemis run コマンドを使用してブローカーを起動します。

    $ <broker-instance-dir>/bin/artemis run
  2. 起動時にログに記録された重大なエラーがないか、コンソールの出力を確認してください。ブローカーでは、準備が整うと Server is now live とログが記録されます。

    $ example-broker/bin/artemis run
               __  __  ____    ____            _
         /\   |  \/  |/ __ \  |  _ \          | |
        /  \  | \  / | |  | | | |_) |_ __ ___ | | _____ _ __
       / /\ \ | |\/| | |  | | |  _ <| '__/ _ \| |/ / _ \ '__|
      / ____ \| |  | | |__| | | |_) | | | (_) |   <  __/ |
     /_/    \_\_|  |_|\___\_\ |____/|_|  \___/|_|\_\___|_|
    
     Red Hat AMQ <version>
    
    2020-06-03 12:12:11,807 INFO  [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
    ...
    2020-06-03 12:12:12,336 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
    ...

C.3. キューの作成

新しいターミナルで、artemis queue コマンドを使用して exampleQueue という名前のキューを作成します。

$ <broker-instance-dir>/bin/artemis queue create --name exampleQueue --address exampleQueue --auto-create-address --anycast

プロンプトで質問に Yes または No で回答するように求められます。すべての質問に N (いいえ) と回答します。

キューが作成されると、ブローカーはサンプルプログラムで使用できるようになります。

C.4. ブローカーの停止

サンプルの実行が終了したら、artemis stop コマンドを使用してブローカーを停止します。

$ <broker-instance-dir>/bin/artemis stop

改訂日時:2023-01-28 12:23:00 +1000