第12章 大きなメッセージの処理

JBoss EAP メッセージングは、クライアントまたはサーバーでメモリーのサイズが制限されていても、大きなメッセージに対応できます。大きなメッセージは、そのままにストリーミングしたり、効率的に転送するためにさらに圧縮したりできます。ユーザーはメッセージのボディーに InputStream を設定すると、大きなメッセージを送信できます。メッセージが送信されると、JBoss EAP メッセージングがこの InputStream を読み取り、データを断片化してサーバーへ送信します。

クライアントもサーバーも、大きなメッセージのボディー部分を完全な形でメモリーに保存しません。コンシューマーは、最初にボディーが空の大きなメッセージを受け取った後、メッセージに OutputStream を設定して、断片的にディスクファイルへストリーミングします。

警告

大きなメッセージを処理する場合、サーバーはメッセージのボディーと同じ方法でメッセージプロパティーを処理しません。たとえば、journal-buffer-size より大きい文字列に設定されたプロパティーを持つメッセージは、ジャーナルバッファーが満杯になるため、サーバーで処理できません。

12.1. 大きなメッセージのストリーミング

標準的な方法で大きなメッセージを送信する場合、メッセージを送信するのに必要なヒープサイズはメッセージのサイズの 4 倍以上になる可能性があります (つまり 1 GB のメッセージでは 4 GB のヒープメモリーが必要になる可能性があります)。このため、JBoss EAP メッセージングでは、java.io.InputStreamjava.io.OutputStream クラスを使用してメッセージのボディーの設定をサポートしていますが、これに必要なメモリーはそれほど多くありません。入力ストリームがメッセージと出力ストリームの送信にそのまま使用され、メッセージの受信に使用されます。

メッセージの受信時に、出力ストリームを処理する方法は 2 つあります。

  • ClientMessage.saveToOutputStream(OutputStream out) メソッドを使用して出力ストリームを復元している間はブロックできます。
  • ClientMessage.setOutputstream (OutputStream out) メソッドを使用してメッセージをストリームに非同期に書き込むことができます。この方式では、メッセージが完全に受信されるまでコンシューマーをキープアライブする必要があります。

メッセージの送信に java.io.InputStream を実装し、メッセージの受信に java.io.OutputStream を実装している限り、ストリームの種類に関係なく (ファイル、JDBC Blob、または SocketInputStream など) 使用できます。

コア API を使用した大きなメッセージのストリーミング

以下の表に、オブジェクトプロパティーを使用して JMS で利用可能な ClientMessage クラスで使用できるメソッドを示します。

ClientMessage メソッド説明JMS 等価プロパティー

setBodyInputStream(InputStream)

送信時にメッセージボディーを読み取るために使用される InputStream を設定します。

JMS_AMQ_InputStream

setOutputStream(OutputStream)

メッセージのボディーを受信する OutputStream を設定します。この方法ではブロックしません。

JMS_AMQ_OutputStream

saveOutputStream(OutputStream)

メッセージのボディーを OutputStream に保存します。これは、コンテンツ全体が OutputStream に転送されるまでブロックされます。

JMS_AMQ_SaveStream

以下のコード例では、コアメッセージを受信する際に出力ストリームを設定します。

ClientMessage firstMessage = consumer.receive(...);

// Block until the stream is transferred
firstMessage.saveOutputStream(firstOutputStream);

ClientMessage secondMessage = consumer.receive(...);

// Do not wait for the transfer to finish
secondMessage.setOutputStream(secondOutputStream);

以下のコード例では、コアメッセージを送信する際に入力ストリームを設定します。

ClientMessage clientMessage = session.createMessage();
clientMessage.setInputStream(dataInputStream);
注記

2GiB を超えるメッセージの場合は、_AMQ_LARGE_SIZE メッセージプロパティーを使用する必要があります。getBodySize() メソッドが最大整数値に制限されているために無効な値を返すためです。

JMS での大きなメッセージのストリーミング

JMS を使用する場合、JBoss EAP メッセージングはオブジェクトプロパティーを設定して、コア API ストリーミングメソッドをマッピングします。Message.setObjectProperty(String name, Object value) メソッドを使用して入力ストリームと出力ストリームを設定します。

InputStream は、送信メッセージで JMS_AMQ_InputStream プロパティーを使用して設定されます。

BytesMessage bytesMessage = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(bytesMessage);

OutputStream は、ブロックの方法で受信されたメッセージの JMS_AMQ_SaveStream プロパティーを使用して設定されます。

BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(120000);
File outputFile = new File("huge_message_received.dat");
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);

// This will block until the entire content is saved on disk
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);

JMS_AMQ_OutputStream プロパティーを使用すると、OutputStream をブロック以外の方法で設定することもできます。

// This does not wait for the stream to finish. You must keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
注記

JMS を使用して大きなメッセージをストリーミングする場合、StreamMessage オブジェクトと BytesMessage オブジェクトのみがサポートされます。