Chapter 14. Intercepting Messages

With AMQ Broker you can intercept packets entering or exiting the broker, allowing you to audit packets or filter messages. Interceptors can change the packets they intercept, which makes them powerful, but also potentially dangerous.

You can develop interceptors to meet your business requirements. Interceptors are protocol specific and must implement the appropriate interface.

Interceptors must implement the intercept() method, which returns a boolean value. If the value is true, the message packet continues onward. If false, the process is aborted, no other interceptors are called, and the message packet is not processed further.

14.1. Creating Interceptors

You can create your own incoming and outgoing interceptors. All interceptors are protocol specific and are called for any packet entering or exiting the server respectively. This allows you to create interceptors to meet business requirements such as auditing packets. Interceptors can change the packets they intercept. This makes them powerful as well as potentially dangerous, so be sure to use them with caution.

Interceptors and their dependencies must be placed in the Java classpath of the broker. You can use the BROKER_INSTANCE_DIR/lib directory since it is part of the classpath by default.

Procedure

The following examples demonstrate how to create an interceptor that checks the size of each packet passed to it. Note that the examples implement a specific interface for each protocol.

  • Implement the appropriate interface and override its intercept() method.

    • If you are using the AMQP protocol, implement the org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor interface.

      package com.example;
      
      import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
      import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements AmqpInterceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        public boolean intercept(final AMQPMessage message, RemotingConnection connection)
        {
          int size = message.getEncodeSize();
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This AMQPMessage has an acceptable size.");
            return true;
          }
          return false;
        }
      }
    • If you are using the Core protocol, your interceptor must implement the org.apache.artemis.activemq.api.core.Interceptor interface.

      package com.example;
      
      import org.apache.artemis.activemq.api.core.Interceptor;
      import org.apache.activemq.artemis.core.protocol.core.Packet;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements Interceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        boolean intercept(Packet packet, RemotingConnection connection)
        throws ActiveMQException
        {
          int size = packet.getPacketSize();
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This Packet has an acceptable size.");
            return true;
          }
          return false;
        }
      }
    • If you are using the MQTT protocol, implement the org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor interface.

      package com.example;
      
      import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
      import io.netty.handler.codec.mqtt.MqttMessage;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements Interceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        boolean intercept(MqttMessage mqttMessage, RemotingConnection connection)
        throws ActiveMQException
        {
          byte[] msg = (mqttMessage.toString()).getBytes();
          int size = msg.length;
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This MqttMessage has an acceptable size.");
            return true;
          }
          return false;
        }
      }
    • If you are using the Stomp protocol, implement the org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor interface.

      package com.example;
      
      import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
      import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements Interceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        boolean intercept(StompFrame stompFrame, RemotingConnection connection)
        throws ActiveMQException
        {
          int size = stompFrame.getEncodedSize();
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This StompFrame has an acceptable size.");
            return true;
          }
          return false;
        }
      }

14.2. Configuring the Broker to Use Interceptors

Once you have created an interceptor, you must configure the broker to use it.

Prerequisites

You must create an interceptor class and add it (and its dependencies) to the Java classpath of the broker before you can configure it for use by the broker. You can use the BROKER_INSTANCE_DIR/lib directory since it is part of the classpath by default.

Procedure

  • Configure the broker to use an interceptor by adding configuration to BROKER_INSTANCE_DIR/etc/broker.xml

    • If your interceptor is intended for incoming messages, add its class-name to the list of remoting-incoming-interceptors.

      <configuration>
        <core>
          ...
          <remoting-incoming-interceptors>
             <class-name>org.example.MyIncomingInterceptor</class-name>
          </remoting-incoming-interceptors>
          ...
        </core>
      </configuration>
    • If your interceptor is intended for outgoing messages, add its class-name to the list of remoting-outgoing-interceptors.

      <configuration>
        <core>
          ...
          <remoting-outgoing-interceptors>
             <class-name>org.example.MyOutgoingInterceptor</class-name>
          </remoting-outgoing-interceptors>
        </core>
      </configuration>

14.3. Interceptors on the Client Side

Clients can use interceptors to intercept packets either sent by the client to the server or by the server to the client. As in the case of a broker-side interceptor, if it returns false, no other interceptors are called and the client does not process the packet further. This process happens transparently to the client except when an outgoing packet is sent in a blocking fashion. In those cases, an ActiveMQException is thrown to the caller because blocking sends provides reliability. The ActiveMQException thrown contains the name of the interceptor that returned false.

As on the server, the client interceptor classes and their dependencies must be added to the Java classpath of the client to be properly instantiated and invoked.