第10章 フロー制御

フロー制御は、プロデューサーとコンシューマー間のデータのフローを制限することで、プロデューサーとコンシューマーの超過を防ぎます。AMQ Broker を使用すると、コンシューマーとプロデューサーの両方のフロー制御を設定できます。

10.1. コンシューマーフロー制御

コンシューマーフロー制御は、クライアントがブローカーからメッセージを消費する際に、ブローカーとクライアント間のデータフローを制御します。AMQ Broker クライアントは、デフォルトでメッセージをバッファーしてからコンシューマーに配信します。バッファーがない場合、クライアントはまず、消費する前にブローカーから各メッセージを要求する必要があります。このタイプのラウンドトリップ通信はコストがかかります。メモリー不足の問題によりコンシューマーがメッセージをすばやく処理できず、バッファーが受信メッセージでオーバーフローを開始するため、クライアント側のデータのフローを制限することが重要になります。

10.1.1. コンシューマーウィンドウサイズの設定

クライアント側のバッファーに保持されるメッセージの最大サイズは、その ウィンドウサイズ によって決定されます。AMQ Broker クライアントのウィンドウのデフォルトサイズは 1 MiB または 1024 * 1024 バイトです。ほとんどのユースケースでは、デフォルトでは問題ありません。その他のケースでは、ウィンドウサイズの最適な値を見つけるには、システムのベンチマークが必要になる場合があります。AMQ Broker では、デフォルトを変更する必要がある場合はバッファーウインドウサイズを設定できます。

ウィンドウサイズの設定

以下の例は、Core JMS クライアントを使用する場合にコンシューマーウインドウサイズパラメーターを設定する方法を示しています。それぞれの例では、コンシューマーウィンドウサイズを 300000 バイトに設定します。

手順

  • コンシューマーウィンドウサイズを設定します。

    • Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として consumerWindowSize パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties ファイルを使用して URL を保存します。

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=300000
    • Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を ActiveMQConnectionFactory.setConsumerWindowSize() に渡します。

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerWindowSize(300000);

10.1.2. 高速コンシューマーの処理

高速コンシューマーは、メッセージをコンシュームすると同時に処理できます。メッセージングシステムのコンシューマーが高速であると確信できる場合は、ウィンドウサイズを - 1 に設定することを検討してください。この設定により、クライアント側でバインドされていないメッセージバッファーリングが可能になります。ただし、この設定は注意して使用してください。コンシューマーが受信と同時にメッセージを処理できない場合、クライアント側のメモリーをオーバーフローさせることができます。

高速コンシューマーのウィンドウサイズの設定

手順

以下の例は、メッセージの高速コンシューマーである Core JMS クライアントを使用する場合に、ウィンドウサイズを - 1 に設定する方法を示しています。

  • コンシューマーウィンドウサイズを - 1 に設定します。

    • Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として consumerWindowSize パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties ファイルを使用して URL を保存します。

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=-1
    • Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を ActiveMQConnectionFactory.setConsumerWindowSize() に渡します。

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerWindowSize(-1);

10.1.3. 低速なコンシューマーの処理

低速なコンシューマーは、各メッセージを処理するのにかなり時間がかかります。このような場合は、クライアント側でメッセージをバッファーしないことが推奨されます。メッセージはブローカー側で残り、他のコンシューマーによって消費されます。バッファーをオフにする利点の 1 つは、キュー上の複数のコンシューマー間で確定的な分散を提供することです。クライアント側のバッファーを無効にして低速なコンシューマーを処理するには、ウィンドウサイズを 0 に設定します。

低速なコンシューマーのウィンドウサイズの設定

手順

以下の例は、メッセージの低速なコンシューマーである Core JMS クライアントを使用する場合に、ウィンドウサイズを 0 に設定する方法を示しています。

  • コンシューマーウィンドウサイズを 0 に設定します。

    • Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として consumerWindowSize パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties ファイルを使用して URL を保存します。

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=0
    • Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を ActiveMQConnectionFactory.setConsumerWindowSize() に渡します。

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerWindowSize(0);

関連情報

低速なコンシューマーを処理する場合にコンシューマーをバッファーしないようにブローカーを設定する方法を示す例は、INSTALL_DIR/examples/standardno-consumer-buffering の例を参照してください。

10.1.4. メッセージ消費率の設定

コンシューマーがメッセージを消費できるレートを調整できます。スロットリングとしても知られており、消費率は、コンシューマーが設定を許可するよりも高速にメッセージを消費しないようにします。

注記

レート制限のあるフロー制御は、ウィンドウベースのフロー制御と併用できます。レート制限のあるフロー制御は、クライアントが 1 秒以内に消費できるメッセージ数のみに影響し、バッファー内のメッセージ数には影響しません。レート制限が遅く、ウィンドウベースの制限が高いと、クライアントの内部バッファーがメッセージですぐに一杯になります。

この機能を有効にするには、レートは正の整数である必要があります。1 秒あたりのメッセージ単位で指定される必要なメッセージ消費率の最大値です。レートを -1 に設定すると、レート制限のあるフロー制御が無効になります。デフォルト値は -1 です。

メッセージ消費率の設定

手順

以下の例では、メッセージの消費速度を毎秒 10 メッセージに制限する Core JMS クライアントを使用しています。

  • コンシューマーレートを設定します。

    • Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として consumerMaxRate パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties ファイルを使用して URL を保存します。

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      java.naming.provider.url=tcp://localhost:61616?consumerMaxRate=10
    • Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を ActiveMQConnectionFactory.setConsumerMaxRate() に渡します。

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerMaxRate(10);

関連情報

コンシューマーレートの制限方法の作業例は、INSTALL_DIR/examples/standardconsumer-rate-limit の例を参照してください。