Menu Close

第6章 Camel コンポーネント

本章では、サポートされる camel コンポーネントについて説明します。

6.1. camel-activemq

Camel ActiveMQ インテグレーションは activemq コンポーネントによって提供されます。

コンポーネントは、組み込みまたは外部ブローカーと連携するように設定できます。Wildfly / EAP コンテナー管理接続プールおよび XA-Transaction サポートでは、ActiveMQ リソースアダプター をコンテナー設定ファイルに設定できます。

6.1.1. JBoss EAP ActiveMQ リソースアダプターの設定

ActiveMQ リソースアダプター rar ファイル をダウンロードします。以下の手順では、ActiveMQ リソースアダプターの設定方法を説明します。

  1. JBoss EAP インスタンスを停止します。
  2. リソースアダプターをダウンロードし、関連する JBoss EAP デプロイメントディレクトリーにコピーします。スタンドアロンモードの場合:

    cp activemq-rar-5.11.1.rar ${JBOSS_HOME}/standalone/deployments/activemq-rar.rar
  3. ActiveMQ アダプターの JBoss EAP リソースアダプターサブシステムを設定します。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
     <resource-adapters>
         <resource-adapter id="activemq-rar.rar">
             <archive>
                 activemq-rar.rar
             </archive>
             <transaction-support>XATransaction</transaction-support>
             <config-property name="UseInboundSession">
                 false
             </config-property>
             <config-property name="Password">
                 defaultPassword
             </config-property>
             <config-property name="UserName">
                 defaultUser
             </config-property>
             <config-property name="ServerUrl">
                 tcp://localhost:61616?jms.rmIdFromConnectionId=true
             </config-property>
             <connection-definitions>
                 <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:/ActiveMQConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                     <xa-pool>
                         <min-pool-size>1</min-pool-size>
                         <max-pool-size>20</max-pool-size>
                         <prefill>false</prefill>
                         <is-same-rm-override>false</is-same-rm-override>
                     </xa-pool>
                 </connection-definition>
             </connection-definitions>
             <admin-objects>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/HELLOWORLDMDBQueue" use-java-context="true" pool-name="HELLOWORLDMDBQueue">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBQueue
                     </config-property>
                 </admin-object>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:/topic/HELLOWORLDMDBTopic" use-java-context="true" pool-name="HELLOWORLDMDBTopic">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBTopic
                     </config-property>
                 </admin-object>
             </admin-objects>
         </resource-adapter>
     </resource-adapters>
 </subsystem>

リソースアダプタのアーカイブファイル名が activemq-rar.rar とは異なる場合、アーカイブファイルの名前と一致するように、前述の設定の archive 要素の内容を変更する必要があります。

UserName および Password 設定プロパティーの値は、外部ブローカーの有効なユーザーのクレデンシャルと一致するように、選択する必要があります。

外部ブローカーによって公開される実際のホスト名およびポートに一致するように、ServerUrl 設定プロパティーの値を変更する必要がある場合があります。

4) JBoss EAP を起動します。すべてが正しく設定されていれば、JBoss EAP server.log 内に以下のようなメッセージが表示されます。

13:16:08,412 INFO  [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010406: Registered connection factory java:/AMQConnectionFactory`

6.1.2. Camel ルート設定

以下の ActiveMQ プロデューサーおよびコンシューマーの例は、ActiveMQ 組み込みブローカーと 'vm' トランスポートを使用します (よって、外部 ActiveMQ ブローカーが必要ないようにします)。

この例では、camel-cdi コンポーネントとともに CDI を使用します。JMS ConnectionFactory インスタンスは、JNDI ルックアップを介して Camel RouteBuilder に注入されます。

6.1.2.1. ActiveMQ プロデューサー

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Override
  public void configure() throws Exception {
    from("timer://sendJMSMessage?fixedRate=true&period=10000")
    .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
    .to("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
    .log("JMS Message sent");
  }
}

メッセージが WildFlyCamelQueue 宛先に追加されるたびに、ログメッセージがコンソールに出力されます。メッセージをキューに配置されていることを確認するには、Camel on EAP サブシステムによって提供される ../features/hawtio.md[Hawtio console,window=_blank] を使用できます。

activemq queue browse

6.1.2.2. ActiveMQ コンシューマー

ActiveMQ メッセージを使用するために、Camel RouteBuilder 実装はプロデューサーの例と似ています。

ActiveMQ エンドポイントが WildFlyCamelQueue 宛先からメッセージを消費すると、コンテンツはコンソールに記録されます。

@Override
public void configure() throws Exception {
  from("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
  .to("log:jms?showAll=true");
}

6.1.2.3. ActiveMQ トランザクション

6.1.2.3.1. ActiveMQ リソースアダプターの設定

XA トランザクションサポートや接続プールなどを活用するには、ActiveMQ リソースアダプターが必要です。

以下の XML スニペットは、JBoss EAP サーバーの XML 設定内でリソースアダプターがどのように設定されているかを示しています。ServerURL は組み込みブローカーを使用するように設定されていることに注意してください。接続ファクトリーは JNDI 名 java:/ActiveMQConnectionFactory にバインドされます。これは、次の RouteBuilder の例で検索されます。

最後に、queue1 と queue2 という名前の 2 つのキューが設定されます。

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
  <resource-adapters>
    <resource-adapter id="activemq-rar.rar">
      ...
      <admin-objects>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue1" use-java-context="true" pool-name="queue1pool">
          <config-property name="PhysicalName">queue1</config-property>
        </admin-object>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue2" use-java-context="true" pool-name="queue2pool">
          <config-property name="PhysicalName">queue2</config-property>
        </admin-object>
      </admin-objects>
    </resource-adapter>
  </resource-adapters>
</subsystem>

6.1.2.4. トランザクションマネージャー

camel-activemq コンポーネントには、org.springframework.transaction.PlatformTransactionManager タイプのトランザクションマネージャーが必要です。そのため、最初にこの要件を満たす JtaTransactionManager を拡張する Bean を作成します。Bean には @Named アノテーションが付けられており、Camel Bean レジストリー内に Bean を登録できることに注意してください。また、JBoss EAP トランザクションマネージャーとユーザートランザクションインスタンスは CDI を使用して注入されることに注意してください。

@Named("transactionManager")
public class CdiTransactionManager extends JtaTransactionManager {

  @Resource(mappedName = "java:/TransactionManager")
  private TransactionManager transactionManager;

  @Resource
  private UserTransaction userTransaction;

  @PostConstruct
  public void initTransactionManager() {
    setTransactionManager(transactionManager);
    setUserTransaction(userTransaction);
  }
}

6.1.2.5. トランザクションポリシー

次に、使用するトランザクションポリシーを宣言する必要があります。ここでも、@Named アノテーションを使用して Bean を Camel で利用可能にします。また、トランザクションマネージャーも、必要なトランザクションポリシーで TransactionTemplate を作成できるように注入されます。このインスタンスの PROPAGATION_REQUIRED

@Named("PROPAGATION_REQUIRED")
public class CdiRequiredPolicy extends SpringTransactionPolicy {
  @Inject
  public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) {
    super(new TransactionTemplate(cdiTransactionManager,
      new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)));
  }
}

6.1.2.6. ルートビルダー

これで、Camel RouteBuilder クラスを設定し、Camel ActiveMQ コンポーネントに必要な依存関係を注入できるようになります。リソースアダプターに設定した ActiveMQ 接続ファクトリーは、以前設定したトランザクションマネージャーとともに注入されます。

この例では、queue1 からメッセージが消費されるたびに queue2 という名前の別の JMS キューにルーティングされます。queue2 から消費されるメッセージにより、JMS トランザクションは rollback () DSL メソッドを使用してロールバックされます。これにより、元のメッセージがデッドレターキュー (DLQ) に配置されます。

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:/ActiveMQConnectionFactory")
  private ConnectionFactory connectionFactory;

  @Inject
  private CdiTransactionManager transactionManager;

  @Override
  public void configure() throws Exception {
    ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent();
    activeMQComponent.setTransacted(false);
    activeMQComponent.setConnectionFactory(connectionFactory);
    activeMQComponent.setTransactionManager(transactionManager);

    getContext().addComponent("activemq", activeMQComponent);

      errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ")
      .useOriginalMessage()
      .maximumRedeliveries(0)
      .redeliveryDelay(1000));

    from("activemq:queue:queue1F")
      .transacted("PROPAGATION_REQUIRED")
      .to("activemq:queue:queue2");

    from("activemq:queue:queue2")
      .to("log:end")
      .rollback();
  }
}

6.1.3. セキュリティー

JMS セキュリティーのセクション を参照してください。

6.1.4. GitHub のコード例

camel-activemq アプリケーション の例は GitHub で利用できます。