Red Hat Training

A Red Hat training course is available for Red Hat Fuse

286.5. 高度な使用上の注意

286.5.1. Plugable 接続リソース管理

SJMS2 は、組み込みの接続プールを通じて JMS Connection リソース管理を提供します。これにより、サードパーティーの API プーリングロジックに依存する必要がなくなります。ただし、J2EE または OSGi コンテナーによって提供されるものなど、外部接続リソースマネージャーを使用する必要がある場合があります。このため、SJMS2 は、内部 SJMS2 接続プーリング機能をオーバーライドするために使用できるインターフェイスを提供します。これは、ConnectionResource インターフェイスを通じて実現されます。

ConnectionResource は、SJMS2 コンポーネントに Connection プールを提供するために使用されるコントラクトであり、必要に応じて接続を借用および返却するためのメソッドを提供します。ユーザーは、SJMS2 を外部接続プーリングマネージャーと統合する必要がある場合に使用する必要があります。

ただし、標準の ConnectionFactory プロバイダーの場合は、SJMS2 で提供される ConnectionFactoryResource 実装をそのまま使用するか、このコンポーネント用に最適化して拡張することをお勧めします。

以下は、ActiveMQ PooledConnectionFactory でプラグ可能な ConnectionResource を使用する例です。

public class AMQConnectionResource implements ConnectionResource {
    private PooledConnectionFactory pcf;

    public AMQConnectionResource(String connectString, int maxConnections) {
        super();
        pcf = new PooledConnectionFactory(connectString);
        pcf.setMaxConnections(maxConnections);
        pcf.start();
    }

    public void stop() {
        pcf.stop();
    }

    @Override
    public Connection borrowConnection() throws Exception {
        Connection answer = pcf.createConnection();
        answer.start();
        return answer;
    }

    @Override
    public Connection borrowConnection(long timeout) throws Exception {
        // SNIPPED...
    }

    @Override
    public void returnConnection(Connection connection) throws Exception {
        // Do nothing since there isn't a way to return a Connection
        // to the instance of PooledConnectionFactory
        log.info("Connection returned");
    }
}

次に、ConnectionResourceSjms2Component に渡します。

CamelContext camelContext = new DefaultCamelContext();
AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
Sjms2Component component = new Sjms2Component();
component.setConnectionResource(pool);
camelContext.addComponent("sjms2", component);

完全な使用例を確認するには、ConnectionResourceIT を参照してください。

286.5.2. セッション、コンシューマー、およびプロデューサーのプーリングとキャッシング管理

近日公開 …

286.5.3. バッチメッセージのサポート

Sjms2Producer は、List をカプセル化する Exchange を作成することにより、メッセージのコレクションのパブリッシュをサポートします。この Sjms2Producer は、List の内容を繰り返し、各メッセージを個別にパブリッシュします。

メッセージのバッチを生成するときに、各メッセージに固有のヘッダーを設定する必要がある場合は、SJMS2 BatchMessage クラスを使用できます。Sjms2Producer が BatchMessage リストに遭遇すると、各 BatchMessage を繰り返し、含まれているペイロードとヘッダーを公開します。

以下は、BatchMessage クラスの使用例です。まず、BatchMessage のリストを作成します:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}

次に、リストを公開します。

template.sendBody("sjms2:queue:batch.queue", messages);

286.5.4. カスタマイズ可能なトランザクションコミットストラテジー (ローカル JMS トランザクションのみ)

SJMS2 は、TransactionCommitStrategy インターフェイスを使用して、カスタムでプラグ可能なトランザクションストラテジーを作成する手段を開発者に提供します。これにより、ユーザーは、SessionTransactionSynchronization が セッションをいつコミットするかを決定するために使用する一連の固有の状況を定義できます。その使用例は、次のセクションで詳しく説明する BatchTransactionCommitStrategy です。

286.5.5. トランザクションバッチのコンシューマーとプロデューサー

SJMS2 コンポーネントは、プロデューサーエンドポイントとコンシューマーエンドポイントの両方でローカル JMS トランザクションのバッチ処理をサポートするように設計されています。ただし、それらがそれぞれでどのように処理されるかは非常に異なります。

SJMS2 コンシューマーエンドポイントは、X メッセージを関連するセッションでコミットする前に処理する単純な実装です。コンシューマーでバッチ処理されたトランザクションを有効にするには、まず、transacted パラメーターを true に設定してトランザクションを有効にし、次に transactionBatchCount を追加して 0 より大きい任意の値に設定します。たとえば、次の設定では、10 メッセージごとにセッションがコミットされます。

sjms2:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10

コンシューマーエンドポイントでのバッチの処理中に例外が発生した場合、セッションロールバックが呼び出され、メッセージが次に使用可能なコンシューマーに再配信されます。関連するセッションの BatchTransactionCommitStrategy のカウンターも 0 にリセットされます。JMSRedelivered ヘッダーが true に設定されたメッセージを監視するために、バッチメッセージのプロセッサーにフックを配置することは、ユーザーの責任です。これは、メッセージがある時点でロールバックされたこと、および処理が成功したことを確認する必要があることを示しています。

トランザクション処理されたバッチコンシューマーには、セッションで開いているトランザクションをコミットする前にメッセージ間で既定の時間 (5000 ミリ秒) 待機する内部タイマーのインスタンスも含まれます。デフォルト値の 5000 ミリ秒 (最小 1000 ミリ秒) は、ほとんどのユースケースに適していますが、さらに調整が必要な場合は、単に transactionBatchTimeout パラメーターを設定してください。

sjms2:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000

受け入れられる最小値は 1000 ミリ秒です。これは、コンテキスト切り替えの量が不要なパフォーマンスへの影響をもたらし、メリットが得られない可能性があるためです。

ただし、プロデューサーエンドポイントの処理方法は大きく異なります。プロデューサでは、各メッセージが宛先に配信された後、Exchange が閉じられ、そのメッセージへの参照がなくなります。再配信可能なすべてのメッセージを利用できるようにするには、BatchMessage をパブリッシュしているプロデューサーエンドポイントでトランザクションを有効にするだけです。トランザクションは、バッチリスト内のすべてのメッセージを含む交換の最後にコミットされます。追加の設定は必要ありません。以下に例を示します。

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}

トランザクションを有効にしてリストを公開します。

template.sendBody("sjms2:queue:batch.queue?transacted=true", messages);