3.3. パケットをインターセプトしてメッセージの監査

ブローカーの出入力または終了パケットのインターセプトを行い、パケットの監査やメッセージのフィルターを行います。インターセプターは、インターセプトするパケットを変更します。これにより、インターセプターは強力になりますが、危険にさらされる可能性もあります。

ビジネス要件を満たすためのインターセプターを開発します。インターセプターはプロトコル固有であるため、適切なインターフェースを実装する必要があります。

インターセプターは、ブール値を返す intercept() メソッドを実装する必要があります。値が true の場合、メッセージパケットは続行されます。false の場合、プロセスは中止され、他のインターセプターは呼び出されず、メッセージパケットはこれ以上処理されません。

3.3.1. インターセプターの作成

インターセプターは、インターセプトするパケットを変更できます。独自の受信インターセプターおよび発信インターセプターを作成できます。すべてのインターセプターはプロトコル固有で、サーバーに出入りするパケットに対して呼び出されます。これにより、監査パケットなどのビジネス要件を満たすインターセプターを作成できます。

インターセプターとその依存関係は、ブローカーの Java クラスに配置する必要があります。<broker_instance_dir>/lib ディレクトリは、デフォルトでクラスパスの一部となっているため、使用することができます。

以下の例は、渡された各パケットのサイズをチェックするインターセプターを作成する方法を示しています。

注記

この例では、プロトコルごとに特定のインターフェースを実装します。

手順

  1. 適切なインターフェースを実装し、そのintercept()メソッドをオーバーライドします。

    1. AMQPプロトコルを使用している場合は、org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptorインターフェースを実装してください。

      package com.example;
      
      import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
      import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements AmqpInterceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        public boolean intercept(final AMQPMessage message, RemotingConnection connection)
        {
          int size = message.getEncodeSize();
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This AMQPMessage has an acceptable size.");
            return true;
          }
          return false;
        }
      }
    2. Core Protocolを使用している場合、インターセプターは、org.apache.artemis.activemq.api.core.Interceptorインターフェイスを実装する必要があります。

      package com.example;
      
      import org.apache.artemis.activemq.api.core.Interceptor;
      import org.apache.activemq.artemis.core.protocol.core.Packet;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements Interceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        boolean intercept(Packet packet, RemotingConnection connection)
        throws ActiveMQException
        {
          int size = packet.getPacketSize();
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This Packet has an acceptable size.");
            return true;
          }
          return false;
        }
      }
    3. MQTTプロトコルを使用している場合は、org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptorインターフェースを実装してください。

      package com.example;
      
      import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
      import io.netty.handler.codec.mqtt.MqttMessage;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements Interceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        boolean intercept(MqttMessage mqttMessage, RemotingConnection connection)
        throws ActiveMQException
        {
          byte[] msg = (mqttMessage.toString()).getBytes();
          int size = msg.length;
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This MqttMessage has an acceptable size.");
            return true;
          }
          return false;
        }
      }
    4. STOMPプロトコルを使用している場合は、org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptorインターフェースを実装してください。

      package com.example;
      
      import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
      import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
      import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
      
      public class MyInterceptor implements Interceptor
      {
        private final int ACCEPTABLE_SIZE = 1024;
      
        @Override
        boolean intercept(StompFrame stompFrame, RemotingConnection connection)
        throws ActiveMQException
        {
          int size = stompFrame.getEncodedSize();
          if (size <= ACCEPTABLE_SIZE) {
            System.out.println("This StompFrame has an acceptable size.");
            return true;
          }
          return false;
        }
      }