第2章 ActiveMQ

ActiveMQ コンポーネント

ActiveMQ コンポーネントを使用すると、メッセージを JMS キューまたはトピックに送信するか、Apache ActiveMQ を使用して JMS キューまたはトピックからメッセージを消費できます。

このコンポーネントは、178章JMS コンポーネント をベースにしており、Spring の JMS サポートを宣言型トランザクションに、また Spring の JmsTemplate を送信に、MessageListenerContainer を消費に使用します。178章JMS コンポーネント のコンポーネントのオプションはすべて、このコンポーネントに該当します。

このコンポーネントを使用するには、クラスパスに activemq.jar または activemq-core.jar があり、camel-core.jarcamel-spring.jarcamel-jms.jar などの Apache Camel 依存関係があることを確認してください。

トランザクションとキャッシング

JMS でトランザクションを使用している場合は、パフォーマンスに影響を与える可能性があるため、JMS ページの下の トランザクションとキャッシュレベル セクションを参照してください。

URI 形式

activemq:[queue:|topic:]destinationName

ここでは、destinationName は ActiveMQ キューまたはトピック名に置き換えます。デフォルトでは、destinationName はキュー名として解釈されます。たとえば、キューに接続するには、FOO.BAR を次のように使用します。

activemq:FOO.BAR

必要に応じて、オプションの queue: 接頭辞を含めることができます。

activemq:queue:FOO.BAR

トピックに接続するには、topic: 接頭辞を含める必要があります。たとえば、トピック Stocks.Prices に接続するには、次を使用します。

activemq:topic:Stocks.Prices

オプション

これらのオプションはすべてこのオプションに該当するので、178章JMS コンポーネント コンポーネントのオプションを参照してください。

Camel on EAP デプロイメント

このコンポーネントは、Red Hat JBoss Enterprise Application Platform (JBoss EAP) コンテナー上で簡素化されたデプロイメントモデルを提供する Camel on EAP (Wildfly Camel) フレームワークによってサポートされます。

組み込みブローカーまたは外部ブローカーのいずれかで動作するように ActiveMQ Camel コンポーネントを設定できます。JBoss EAP コンテナーにブローカーを埋め込むには、EAP コンテナー設定ファイルで ActiveMQ リソースアダプターを設定します。詳細は、ActiveMQ リソースアダプターの設定 を参照してください。

接続ファクトリーの設定

次の テストケース は、ActiveMQ への接続に使用される brokerURL を指定しながら、activeMQComponent () メソッド を使用して ActiveMQComponent を CamelContext に追加する方法を示しています。

camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));

Spring XML を使用した接続ファクトリーの設定

次のように、ActiveMQComponent で ActiveMQ ブローカー URL を設定できます。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>
  </bean>

</beans>

接続プーリングの使用

Camel を使用して ActiveMQ ブローカーに送信する場合は、プールされた接続ファクトリーを使用して、JMS 接続、セッション、およびプロデューサーを効率的にプールするように処理することを推奨します。これは ActiveMQ Spring Support ページに記載されています。

Maven を使用して、Jencks AMQ プールを取得できます。

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.3.2</version>
    </dependency>

次に、activemq コンポーネントを次のように設定します。

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory"    class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
注記

プールされた接続ファクトリーの init メソッドと destroy メソッドに注目してください。これは、接続プールが適切に開始および終了されるようにするために重要です。

PooledConnectionFactory は、同時に使用される最大 8 つの接続を含む接続プールを作成します。各接続は、多くのセッションで共有できます。接続ごとのセッションの最大数を設定するために使用できる maxActive という名前のオプションがあります。デフォルト値は 500 です。ActiveMQ 5.7 以降、このオプションはその目的をさらに反映するように名前が変更され、maxActiveSessionPerConnection という名前になりました。concurrentConsumersmaxConnections よりも高い値に設定されていることに注意してください。各コンシューマーがセッションを使用していて、セッションが同じ接続を共有できるので、これは問題ありません。この例では、同時に 8 * 500 = 4000 のアクティブなセッションを指定できます。

ルートでの MessageListener POJO の呼び出し

ActiveMQ コンポーネントは、JMS MessageListener から Processor へのヘルパー Type Converter も提供します。これは、43章Bean コンポーネント コンポーネントは、任意のルート内で任意の JMS MessageListener Bean を直接呼び出すことができます。

たとえば、次のように JMS で MessageListener を作成できます。

public class MyListener implements MessageListener {
   public void onMessage(Message jmsMessage) {
       // ...
   }
}

次に、以下のようにルートで使用します。

from("file://foo/bar").
  bean(MyListener.class);

つまり、任意の Apache Camel コンポーネントを再利用して、それらを JMS MessageListener POJO に簡単に統合できます。

ActiveMQ 宛先オプションの使用

ActiveMQ 5.6 以降で利用可能

"destination." の接頭辞を使用して、エンドポイント uri で 宛先オプション を設定できます。たとえば、コンシューマーを排他的とマークし、そのプリフェッチサイズを 50 に設定するには、次のようにします。

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="file://src/test/data?noop=true"/>
    <to uri="activemq:queue:foo"/>
  </route>
  <route>
    <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. -->
    <from uri="activemq:foo?destination.consumer.exclusive=true&amp;destination.consumer.prefetchSize=50"/>
    <to uri="mock:results"/>
  </route>
</camelContext>

アドバイザリーメッセージの使用

ActiveMQ は、消費可能なトピックに配置される アドバイザリーメッセージ を生成できます。このようなメッセージは、遅いコンシューマーを検出した場合にアラートを送信したり、統計 (1 日に生成されるメッセージの数など) を作成したりするのに役立ちます。 次の Spring DSL の例は、トピックからメッセージを読み取る方法を示しています。

<route>
	<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
	<convertBodyTo type="java.lang.String"/>
	<transform>
	     <simple>${in.body}&#13;</simple>
	</transform>
	<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>

キューでメッセージを消費すると、data/activemq フォルダーの下に次のファイルが表示されます。

advisoryConnection-20100312.txt advisoryProducer-20100312.txt

および含まれる文字列:

      ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140
      -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles-
      3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null,
      expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468,
      correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null,
      marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo
      {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50,
      clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****,
      brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true},
      redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-
      3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true,
      droppable = false}

コンポーネント JAR の取得

以下の依存関係が必要です。

  • activemq-camel

ActiveMQ は、ActiveMQ プロジェクト でリリースされた 178章JMS コンポーネント コンポーネントの拡張です。

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-camel</artifactId>
  <version>5.6.0</version>
</dependency>