Chapter 12. Working with Large Messages

JBoss EAP messaging supports large messages, even when the client or server has limited amounts of memory. Large messages can be streamed as they are, or they can be compressed further for more efficient transferral. A user can send a large message by setting an InputStream in the body of the message. When the message is sent, JBoss EAP messaging reads this InputStream and transmits data to the server in fragments.

Neither the client nor the server stores the complete body of a large message in memory. The consumer initially receives a large message with an empty body and thereafter sets an OutputStream on the message to stream it in fragments to a disk file.

Warning

When processing large messages, the server does not handle message properties in the same way as the message body. For example a message with a property set to a string that is bigger than journal-buffer-size cannot be processed by the server because it overfills the journal buffer.

12.1. Streaming Large Messages

If you send large messages the standard way, the heap size required to send them can be four or more times the size of the message, meaning a 1 GB message can require 4 GB in heap memory. For this reason, JBoss EAP messaging supports setting the body of messages using the java.io.InputStream and java.io.OutputStream classes, which require much less memory. Input streams are used directly for sending messages and output streams are used for receiving messages.

When receiving messages, there are two ways to deal with the output stream:

  • You can block while the output stream is recovered using the ClientMessage.saveToOutputStream(OutputStream out) method.
  • You can use the ClientMessage.setOutputstream(OutputStream out) method to asynchronously write the message to the stream. This method requires that the consumer be kept alive until the message has been fully received.

You can use any kind of stream you like, for example files, JDBC Blobs, or SocketInputStream, as long as it implements java.io.InputStream for sending messages and java.io.OutputStream for receiving messages.

Streaming Large Messages Using the Core API

The following table shows the methods available on the ClientMessage class that are available through JMS by using object properties.

ClientMessage MethodDescriptionJMS Equivalent Property

setBodyInputStream(InputStream)

Set the InputStream used to read a message body when it is sent.

JMS_AMQ_InputStream

setOutputStream(OutputStream)

Set the OutputStream that will receive the body of a message. This method does not block.

JMS_AMQ_OutputStream

saveOutputStream(OutputStream)

Save the body of the message to the OutputStream. It will block until the entire content is transferred to the OutputStream.

JMS_AMQ_SaveStream

The following code example sets the output stream when receiving a core message.

ClientMessage firstMessage = consumer.receive(...);

// Block until the stream is transferred
firstMessage.saveOutputStream(firstOutputStream);

ClientMessage secondMessage = consumer.receive(...);

// Do not wait for the transfer to finish
secondMessage.setOutputStream(secondOutputStream);

The following code example sets the input stream when sending a core message:

ClientMessage clientMessage = session.createMessage();
clientMessage.setInputStream(dataInputStream);
Note

For messages larger than 2GiB, you must use the _AMQ_LARGE_SIZE message property. This is because the getBodySize() method will return an invalid value because it is limited to the maximum integer value.

Streaming Large Messages Over JMS

When using JMS, JBoss EAP messaging maps the core API streaming methods by setting object properties. You use the Message.setObjectProperty(String name, Object value) method to set the input and output streams.

The InputStream is set using the JMS_AMQ_InputStream property on messages being sent.

BytesMessage bytesMessage = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(bytesMessage);

The OutputStream is set using the JMS_AMQ_SaveStream property on messages being received in a blocking manner.

BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(120000);
File outputFile = new File("huge_message_received.dat");
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);

// This will block until the entire content is saved on disk
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);

The OutputStream can also be set in a non-blocking manner by using the JMS_AMQ_OutputStream property.

// This does not wait for the stream to finish. You must keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
Note

When streaming large messages using JMS, only StreamMessage and BytesMessage objects are supported.

12.2. Configuring Large Messages

12.2.1. Configure Large Message Location

You can read the configuration for the large messages directory by using the management CLI command below. The output is also included to highlight default configuration.

/subsystem=messaging-activemq/server=default/path=large-messages-directory:read-resource
{
    "outcome" => "success",
    "result" => {
        "path" => "activemq/largemessages",
        "relative-to" => "jboss.server.data.dir"
    }
}
Important

To achieve the best performance, it is recommended to store the large messages directory on a different physical volume from the message journal or the paging directory.

The large-messages-directory configuration element is used to specify a location on the filesystem to store the large messages. Note that by default the path is activemq/largemessages. You can change the location for path by using the following management CLI command.

/subsystem=messaging-activemq/server=default/path=large-messages-directory:write-attribute(name=path,value=PATH_LOCATION)

Also note the relative-to attribute in the output above. When relative-to is used, the value of the path attribute is treated as relative to the file path specified by relative-to. By default this value is the JBoss EAP property jboss.server.data.dir. For standalone servers, jboss.server.data.dir is located at EAP_HOME/standalone/data. For domains, each server will have its own serverX/data/activemq directory located under EAP_HOME/domain/servers. You can change the value of relative-to using the following management CLI command.

/subsystem=messaging-activemq/server=default/path=large-messages-directory:write-attribute(name=relative-to,value=RELATIVE_LOCATION)
Configuring Large Message Size

Use the management CLI to view the current configuration for large messages. Note that the this configuration is part of a connection-factory element. For example, to read the current configuration for the default RemoteConnectionFactory that is included, use the following command:

/subsystem=messaging-activemq/server=default/connection-factory=RemoteConnectionFactory:read-attribute(name=min-large-message-size)

Set the attribute using a similar syntax.

/subsystem=messaging-activemq/server=default/connection-factory=RemoteConnectionFactory:write-attribute(name=min-large-message-size,value=NEW_MIN_SIZE)
Note

The value of the attribute min-large-message-size should be in bytes.

Configuring Large Message Compression

You can choose to compress large messages for fast and efficient transfer. All compression/decompression operations are handled on the client side. If the compressed message is smaller than min-large-message size, it is sent to the server as a regular message. Compress large messages by setting the boolean property compress-large-messages to true using the management CLI.

/subsystem=messaging-activemq/server=default/connection-factory=RemoteConnectionFactory:write-attribute(name=compress-large-messages,value=true)

12.2.2. Configuring Large Message Size Using the Core API

If you are using the core API on the client side, you need to use the setMinLargeMessageSize method to specify the minimum size of large messages. The minimum size of large messages (min-large-message-size) is set to 100KB by default.

ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()))

locator.setMinLargeMessageSize(25 * 1024);

ClientSessionFactory factory = ActiveMQClient.createClientSessionFactory();