Chapter 7. Message delivery

7.1. Writing to a streamed large message

To write to a large message, use the BytesMessage.writeBytes() method. The following example reads bytes from a file and writes them to a message:

Example: Writing to a streamed large message

BytesMessage message = session.createBytesMessage();
File inputFile = new File(inputFilePath);
InputStream inputStream = new FileInputStream(inputFile);

int numRead;
byte[] buffer = new byte[1024];

while ((numRead = inputStream.read(buffer, 0, buffer.length)) != -1) {
    message.writeBytes(buffer, 0, numRead);
}

7.2. Reading from a streamed large message

To read from a large message, use the BytesMessage.readBytes() method. The following example reads bytes from a message and writes them to a file:

Example: Reading from a streamed large message

BytesMessage message = (BytesMessage) consumer.receive();
File outputFile = new File(outputFilePath);
OutputStream outputStream = new FileOutputStream(outputFile);

int numRead;
byte buffer[] = new byte[1024];

for (int pos = 0; pos < message.getBodyLength(); pos += buffer.length) {
    numRead = message.readBytes(buffer);
    outputStream.write(buffer, 0, numRead);
}

7.3. Using message groups

Message groups are sets of messages that have the following characteristics:

  • Messages in a message group share the same group ID. That is, they have same group identifier property. For JMS messages, the property is JMSXGroupID.
  • Messages in a message group are always consumed by the same consumer, even if there are many consumers on a queue. Another consumer is chosen to receive a message group if the original consumer is closed.

Message groups are useful when you want all messages for a certain value of the property to be processed serially by the same consumer. For example, you may want orders for any particular stock purchase to be processed serially by the same consumer. To do this, you could create a pool of consumers and then set the stock name as the value of the message property. This ensures that all messages for a particular stock are always processed by the same consumer.

Setting the group ID

The examples below show how to use message groups with AMQ Core Protocol JMS.

Procedure

  • If you are using JNDI to establish a JMS connection factory for your JMS client, add the groupID parameter and supply a value. All messages sent using this connection factory have the property JMSXGroupID set to the specified value.

    java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
    connectionFactory.myConnectionFactory=tcp://localhost:61616?groupID=MyGroup
  • If you are not using JNDI, set the JMSXGroupID property using the setStringProperty() method.

     Message message = new TextMessage();
     message.setStringProperty("JMSXGroupID", "MyGroup");
     producer.send(message);

Additional resources

See message-group and message-group2 under <install-dir>/examples/features/standard for working examples of how message groups are configured and used.

7.4. Using duplicate message detection

AMQ Broker includes automatic duplicate message detection, which filters out any duplicate messages it receives so you do not have to code your own duplicate detection logic.

To enable duplicate message detection, provide a unique value for the message property _AMQ_DUPL_ID. When a broker receives a message, it checks if _AMQ_DUPL_ID has a value. If it does, the broker then checks in its memory cache to see if it has already received a message with that value. If a message with the same value is found, the incoming message is ignored.

If you are sending messages in a transaction, you do not have to set _AMQ_DUPL_ID for every message in the transaction, but only in one of them. If the broker detects a duplicate message for any message in the transaction, it ignores the entire transaction.

Setting the duplicate ID message property

The following example shows how to set the duplicate detection property using AMQ Core Protocol JMS. Note that for convenience, the clients use the value of the constant org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID for the name of the duplicate ID property, _AMQ_DUPL_ID.

Procedure

Set the value for _AMQ_DUPL_ID to a unique string value.

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id";
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

7.5. Using message interceptors

With AMQ Core Protocol JMS you can intercept packets entering or exiting the client, allowing you to audit packets or filter messages. Interceptors can change the packets that they intercept. This makes interceptors powerful, but also a feature that you should use with caution.

Interceptors must implement the intercept() method, which returns a boolean value. If the returned value is true, the message packet continues onward. If the returned value is false, the process is aborted, no other interceptors are called, and the message packet is not processed further.

Message interception occurs transparently to the main client code except when an outgoing packet is sent in blocking send mode. When an outgoing packet is sent with blocking enabled and that packet encounters an interceptor that returns false, an ActiveMQException is thrown to the caller. The thrown exception contains the name of the interceptor.

Your interceptor must implement the org.apache.artemis.activemq.api.core.Interceptor interface. The client interceptor classes and their dependencies must be added to the Java classpath of the client to be properly instantiated and invoked.

package com.example;

import org.apache.artemis.activemq.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;

public class MyInterceptor implements Interceptor {
    private final int ACCEPTABLE_SIZE = 1024;

    @Override
    boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
        int size = packet.getPacketSize();
        if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This Packet has an acceptable size.");
            return true;
        }
        return false;
    }
}