Chapter 13. Working with Large Messages

AMQ Broker supports sending and receiving of large messages even when the client and broker are running with limited memory. The only limit to the size of a message that is sent or consumed is the amount of disk space you have available.

Large messages are sent using an InputStream on a message body. For example, a FileInputStream could be used to send a large message from a large file on disk. As the InputStream is read, the data is sent to the broker in fragments. The broker persists the fragments to disk as it receives them.

When a broker later delivers a large message to a consumer, the fragments are read back from the disk and sent. At first, the consumer receives a large message with an empty body. The consumer can then set an OutputStream on the message to stream the large message body to a file on disk or elsewhere. At no time is the entire message body stored fully in memory, either on the client or on the broker.

You can store large messages to a physical disk or to a database table.

Note

For the best performance, the large messages directory should be stored on a different physical volume than the one used to store the message journal or paging directory.

13.1. Preparing Brokers to Store Large Messages

Large messages are stored on a file system location available to the broker. The following procedure shows you how to configure the location where large messages are stored.

Configuring the Large Messages Directory

Procedure

  • Add the configuration to BROKER_INSTANCE_DIR/etc/broker.xml that references the location chosen to store large messages.

    • If you are using journal persistence, add the large-messages-directory element and provide the file system path to the location used to store large messages.

      <configuration>
        <core>
          ...
          <large-messages-directory>/path/to/large-messages</large-messages-directory> 1
          ...
        </core>
      </configuration>
      1
      The default value for the large-messages-directory configuration element is BROKER_INSTANCE_DIR/data/largemessages
    • If you are using JDBC persistence, add the name of the database table used to persist large messages to your database-store.

      <store>
        <database-store>
          ...
          <large-message-table>MY_TABLE</large-message-table> 1
        </database-store>
      </store>
      1
      The default value for the large-message-table configuration element is LARGE_MESSAGE_TABLE.

Related Information

See the large-message example found under BROKER_INSTANCE_DIR/examples/standard/ for a working example showing how to work with large messages.

For more information about configuring a data-store see Configuring JDBC Persistence.

13.2. Preparing Clients to Send Large Messages

Clients prepare their connection for large messages by providing a value for the property minLargeMessageSize. The value can be provided as a parameter in the URL used to connect to a broker, or it can be set using a supported client API. Any message larger than minLargeMessageSize is considered a large message that is split up and sent in fragments.

Note

AMQ Broker messages are encoded using two bytes per character so if the message data is filled with ASCII characters (which are one byte) the size of the resulting AMQ Broker message would roughly double. This is important when calculating the size of a "large" message as it may appear to be less than the minLargeMessageSize before it is sent, but it then turns into a "large" message once it is encoded. The default value is 100KiB.

Configuring a Client to Send Large Messages

The following examples show how to prepare a JMS client to send large messages.

Procedure

  • Set the minimum size for large messages.

    • If you are using JNDI to instantiate your connection factory, you can specify the size in a jndi.properties, using the parameter minLargeMessageSize.

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?minLargeMessageSize=250000
    • If you are not using JNDI, specify the minimum large message size using the method ActiveMQConnectionFactory.setMinLargeMessageSize().

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setMinLargeMessageSize(250000);

13.3. Sending Large Messages

AMQ Broker supports using Java-based InputStreams for sending large messages. The most common use case is to send files stored on your disk, but you could also send such data as JDBC Blobs or JSON objects recovered from HTTPRequests.

Note

When using JMS, streaming large messages is only supported when using StreamMessage and BytesMessage.

Procedure

  • Set the JMS_AMQ_InputStream property to mark the message as streamed, as in the example below.

    BytesMessage message = session.createBytesMessage();
    FileInputStream fileInputStream = new FileInputStream(fileInput);
    BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
    message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
    ...

13.4. Receiving Large Messages

The AMQ Broker Core JMS API has a method for synchronously receiving a streamed message. The methods block further processing until the input stream is completely received.

Receiving a Large Message Synchronously

Procedure

  • Set the JMS_AMQ_SaveStream on the message object.

    BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
    File outputFile = new File("large_message_received.dat");
    FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
    BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
    
    // This will block until the entire content is saved on disk
    messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);

Receiving a Large Message Asynchronously

The Core JMS API also has a method for asynchronously receiving a streamed message. The method does not block processing by a consumer while it receives the input stream.

Procedure

  • Set the JMS_AMQ_OutputStream parameter on the message object.

    BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
    File outputFile = new File("large_message_received.dat");
    FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
    BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
    
    // This will not block until the entire content is saved on disk
    messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);

13.5. An Alternative to Streaming Messages

If you choose not to use the InputStream or OutputStream capability of AMQ Broker, you could still access the data directly by getting the bytes of the body as you normally would.

You can also stream a JMS BytesMessage or StreamMessage directly, as in the example below.

+

BytesMessage rm = (BytesMessage)cons.receive(10000);
byte data[] = new byte[1024];
for (int i = 0; i < rm.getBodyLength(); i += 1024)
{
   int numberOfBytes = rm.readBytes(data);
   // Do whatever you want with the data
}

13.6. Compressing Large Messages

Clients can also compress a large message before sending it. The ZIP algorithm is used to compress the message body as the message is sent to the broker.

Note

If the compressed size of a large message is below the value of minLargeMessageSize, the message is sent as a regular message. Therefore, it is not written to the broker’s large-message data directory.

Procedure

  • If you use a Core JMS client and JNDI, use the JNDI context environment to enable message compression, as in the example below.

    java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
    connectionFactory.myConnectionFactory=tcp://localhost:61616?compressLargeMessages=true
  • (Optional) Add the minLargeMessageSize parameter to the connection factory URL to set the minimum size needed before a message is compressed. In the example below, messages are compressed only if they exceed 250 kilobytes in size.

    connectionFactory.myConnectionFactory=tcp://localhost:61616?compressLargeMessages=true&minLargeMessageSize=250kb