第10章 Message Transformation

概要

Message Transformation パターンは、さまざまな目的のためにメッセージの内容を変更する方法を表しています。

10.1. Content Enricher

概要

Content Enricher パターンは、メッセージの宛先が元のメッセージに存在する以上のデータを必要とするシナリオを表しています。この場合、メッセージ変換、ルーティングロジック内の任意のプロセッサー、または Content Enricher メソッドを使用して外部リソースから追加データをプルします。

図10.1 Content Enricher パターン

Content Enricher パターン

コンテンツ補完の代替方法

Apache Camel は、コンテンツを補完するための複数の方法をサポートします。

  • ルーティングロジックの任意のプロセッサーを使用したメッセージ変換
  • enrich() メソッドは、現在のエクスチェンジのコピーを プロデューサー エンドポイントに送信し、生成された応答のデータを使用して、リソースから追加のデータを取得します。Enricher によって作成されるエクスチェンジは、常に InOut エクスチェンジです。
  • pollEnrich() メソッドは、データの コンシューマー エンドポイントをポーリングして追加のデータを取得します。実質的に、メインルートからのコンシューマーエンドポイントと pollEnrich() 操作中のコンシューマーエンドポイントは結合されます。つまり、ルートの初期コンシューマーの受信メッセージが、ポーリングするコンシューマーの pollEnrich() をトリガーします。
注記

enrich() および pollEnrich() メソッドは動的エンドポイント URI をサポートします。URI を取得するには、現在のエクスチェンジから値を取得できる式を指定します。たとえば、データエクスチェンジから計算される名前でファイルをポーリングできます。この動作は Camel 2.16 で導入されました。この変更により XML DSL を使わず、容易に移行ができます。Java DSL は後方互換性を維持します。

メッセージ変換およびプロセッサーを使用したコンテンツの補完

Camel は、IDE 上でタイプセーフなコード補完をしながら、ルーティングおよび仲介ルールを作成できる Fluent Builder (流れるようなビルダー) を提供します。これにより、スマートな補完を提供し、リファクタリングを安全に行うことができます。分散システムをテストする場合、特定システムが利用可能または書き込みされるまで、システムの他の部分をテストするために、システムの特定部分のスタブを作成しなければならないような要件が一般的です。これを実行する 1 つの方法として、何らかの テンプレート システムを使用して、ほとんどの静的なボディーを持つ動的メッセージを生成することで、要求への応答を生成します。テンプレートを使用する別の方法として、ある宛先からメッセージを受信し、VelocityXQuery などを使用して変換してから、別の宛先に送信する方法もあります。以下の例は、InOnly (一方向) メッセージに対する例になります。

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

InOut (request-reply) メッセージングを使用して ActiveMQ の My.Queue キューでリクエストを処理するとします。JMSReplyTo の宛先に送信されるテンプレートが生成した応答が必要です。以下の例は、これらを行う方法を示しています。

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

以下の例は、DSL を使用してメッセージのボディーを変換する方法を示しています。

from("direct:start").setBody(body().append(" World!")).to("mock:result");

以下の例は、明示的な Java コードを使用してプロセッサーを追加します。

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

次の例では、変換機能として機能する Bean を有効化するために Bean 統合を使用しています。

from("activemq:My.Queue").
  beanRef("myBeanName", "myMethodName").
  to("activemq:Another.Queue");

以下の例は Spring XML 実装を示しています。

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>/>

Enrich() メソッドを使用したコンテンツの補完

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...

Content Enricher (enrich) は、(元のエクスチェンジ に含まれる) 受信メッセージを補完するために、リソースエンドポイント から追加のデータを取得します。集約ストラテジーは、元のエクスチェンジと リソースエクスチェンジ を組み合わせたものです。AggregationStrategy.aggregate(Exchange, Exchange) メソッドの最初のパラメーターは元のエクスチェンジに対応し、2 番目のパラメーターはリソースエクスチェンジに対応します。リソースエンドポイントの結果は、リソースエクスチェンジの Out メッセージに保存されます。以下は、独自の集約ストラテジークラスを実装するためのテンプレートの例です。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

このテンプレートを使用すると、元のエクスチェンジがあらゆる交換パターンを持つことができます。Enricher によって作成されるリソースエクスチェンジは、常に InOut エクスチェンジです。

Spring XML 補完の例

上記の例は Spring XML にも実装できます。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

コンテンツの補完時のデフォルト集約ストラテジー

集約ストラテジーは任意です。提供しない場合、Apache Camel はデフォルトでリソースから取得したボディーを使用します。以下に例を示します。

from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");

上記のルートでは、direct:result エンドポイントに送信されたメッセージには direct:resource からの出力が含まれます。これは、この例ではカスタムの集約を使用していないためです。

XML DSL では、以下のように strategyRef 属性を省略します。

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

Enrich() メソッドでサポートされるオプション

enrich DSL コマンドは、以下のオプションをサポートします。

名前

デフォルト値

説明

expression

なし

Camel 2.16 以降では、このオプションは必須です。Enrich from を使用するよう外部サービスの URI を設定する式を指定します。Simple 式言語、Constant 式言語、または現在のエクスチェンジの値から動的に URI を計算できるその他の言語を使用できます。

uri

 

これらのオプションは削除されています。代わりに expression オプションを指定してください。Camel 2.15 以前では、uri オプションまたは ref オプションを指定する必要がありました。各オプションは、外部サービスの Enrich からのエンドポイント URI を指定します。

ref

 

補完のための外部サービスのエンドポイントを参照します。uri または ref のいずれかを使用する必要があります。

strategyRef

 

外部サービスからの応答を 1 つの送信メッセージにマージするために使用される AggregationStrategy を参照します。デフォルトでは、Camel は外部からの応答を送信メッセージとして使用します。POJO を AggregationStrategy として使用することができます。詳細は、Aggregate パターンのドキュメントを参照してください。

strategyMethodName

 

POJO を AggregationStrategy として使用している場合は、このオプションを指定して、集約メソッドの名前を明示的に宣言します。詳細は、Aggregate パターンを参照してください。

strategyMethodAllowNull

false

デフォルトの動作では、補完するデータがない場合、集約メソッドは使用されません。このオプションが true の場合、補完のデータがなく、POJO を AggregationStrategy に使用していると、null 値は oldExchange として使用されます。詳細は、Aggregate パターンを参照してください。

aggregateOnException

false

デフォルトの動作では、リソースから補完するデータの取得中に例外が発生すると、集約メソッドは 使用されません。このオプションを true に設定すると、aggregate メソッドに例外がある場合に、エンドユーザーがアクションを制御できるようになります。たとえば、例外を非表示にしたり、カスタムメッセージのボディーを設定したりすることができます。

shareUntOfWork

false

Camel 2.16 以降、補完操作は親エクスチェンジとリソースエクスチェンジの間で Unit of Work を共有しないのがデフォルトの動作になります。これは、リソースエクスチェンジに個別の Unit of Work があることを意味します。詳細は、Splitter パターンのドキュメントを参照してください。

cacheSize

1000

Camel 2.16 以降では、このオプションを指定し、補完の操作でプロデューサーを再利用するためにプロデューサーをキャッシュすることができる ProducerCache のキャッシュサイズを指定します。このキャッシュを無効にするには、cacheSize オプションを -1 に設定します。

ignoreInvalidEndpoint

false

Camel 2.16 以降、このオプションは解決できないエンドポイント URI を無視するかどうかを示します。デフォルトの動作では、無効なエンドポイント URI を特定する例外が Camel によって出力されます。

Enrich() メソッド使用時の集約ストラテジーの指定

enrich() メソッドは、リソースエンドポイントから追加のデータを取得し、元のエクスチェンジに含まれる受信メッセージを強化します。集約ストラテジーを使用して、元のエクスチェンジとリソースエクスチェンジを組み合わせることができます。AggregationStrategy.aggregate(Exchange, Exchange) メソッドの最初のパラメーターは、元のエクスチェンジに対応します。2 番目のパラメーターは、リソースのエクスチェンジに対応します。リソースエンドポイントの結果は、リソースエクスチェンジの Out メッセージに保存されます。以下に例を示します。

AggregationStrategy aggregationStrategy = ...

   from("direct:start")
   .enrich("direct:resource", aggregationStrategy)
   .to("direct:result");

   from("direct:resource")
...

以下のコードは、集約ストラテジーを実装するためのテンプレートです。このテンプレートを使用する実装では、元のエクスチェンジはメッセージエクスチェンジパターンになります。Enricher によって作成されるリソースエクスチェンジは、常に InOut メッセージエクスチェンジパターンです。

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

以下の例は、Spring XML DSL を使用して集約ストラテジーを実装する方法を示しています。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

Enrich() での動的 URI の使用

Camel 2.16 以降、enrich() および pollEnrich() メソッドは、現在のエクスチェンジからの情報に基づいて計算される動的 URI の使用をサポートします。たとえば、orderId キーのあるヘッダーが HTTP URL のコンテンツパスの一部として使用される HTTP エンドポイントから補完するには、以下のような操作を行うことができます。

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

以下は、XML DSL と同じ例です。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

pollEnrich() メソッドを使用したコンテンツの補完

この pollEnrich コマンドは、リソースのエンドポイントを コンシューマー として扱います。エクスチェンジをリソースエンドポイントに送信する代わりに、エンドポイントを ポーリング します。デフォルトでは、リソースエンドポイントからエクスチェンジがない場合は、ポーリングはすぐに返します。たとえば、以下のルートは、受信 JMS メッセージのヘッダーから抽出される名前のファイルを読み取ります。

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

ファイルが準備できるまで待機する時間を制限できます。以下の例は、20 秒の最大待機時間を示しています。

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

pollEnrich() の集計ストラテジーを指定することもできます。以下に例を示します。

   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)

pollEnrich() メソッドは、consumer.bridgeErrorHandler=true で設定されたコンシューマーをサポートします。これにより、ポーリングからの例外がルートエラーハンドラーに伝播され、たとえばポーリングを再試行できます。

注記

consumer.bridgeErrorHandler=true のサポートが Camel 2.18 で新たに追加されました。この動作は Camel 2.17 ではサポートされません。

集計ストラテジーの aggregate() メソッドに渡されるリソースエクスチェンジは、エクスチェンジを受け取る前にポーリングがタイムアウトした場合は null の可能性があります。

pollEnrich() によって使用されるポーリングメソッド

pollEnrich() メソッドは、以下のポーリングメソッドのいずれかを呼び出して、コンシューマーエンドポイントをポーリングします。

  • receiveNoWait() (これがデフォルトです。)
  • receive()
  • receive(long timeout)

pollEnrich() コマンドのタイムアウト引数 (ミリ秒単位) は、以下のように呼び出すメソッドを決定します。

  • タイムアウトが 0 または指定されていない場合、pollEnrich()receiveNoWait を呼び出します。
  • タイムアウトが負の値の場合、pollEnrich()receive を呼び出します。
  • それ以外の場合は、pollEnrich()receive(timeout) を呼び出します。

データがない場合、集約ストラテジーの newExchange は null になります。

pollEnrich() メソッドの使用例

以下の例は、inbox/data.txt ファイルからコンテンツを読み込むことによるメッセージの補完を示しています。

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

以下は、XML DSL と同じ例です。

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

指定したファイルが存在しない場合は、メッセージは空になります。ファイルが存在するまで待機するタイムアウトを指定するか、特定の時間まで待機できます。以下の例では、コマンドは 5 秒未満待機します。

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="5000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

pollEnrich() での動的 URI の使用

Camel 2.16 以降、enrich() および pollEnrich() メソッドは、現在のエクスチェンジからの情報に基づいて計算される動的 URI の使用をサポートします。たとえば、ヘッダーを使用して SEDA キュー名を示すエンドポイントから Enrich をポーリングするには、以下を行います。

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

以下は、XML DSL と同じ例です。

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

pollEnrich() メソッドでサポートされるオプション

pollEnrich DSL コマンドは、以下のオプションをサポートします。

名前

デフォルト値

説明

expression

なし

Camel 2.16 以降では、このオプションは必須です。Enrich from を使用するよう外部サービスの URI を設定する式を指定します。Simple 式言語、Constant 式言語、または現在のエクスチェンジの値から動的に URI を計算できるその他の言語を使用できます。

uri

 

これらのオプションは削除されています。代わりに expression オプションを指定してください。Camel 2.15 以前では、uri オプションまたは ref オプションを指定する必要がありました。各オプションは、外部サービスの Enrich からのエンドポイント URI を指定します。

ref

 

補完のための外部サービスのエンドポイントを参照します。uri または ref のいずれかを使用する必要があります。

strategyRef

 

外部サービスからの応答を 1 つの送信メッセージにマージするために使用される AggregationStrategy を参照します。デフォルトでは、Camel は外部からの応答を送信メッセージとして使用します。POJO を AggregationStrategy として使用することができます。詳細は、Aggregate パターンのドキュメントを参照してください。

strategyMethodName

 

POJO を AggregationStrategy として使用している場合は、このオプションを指定して、集約メソッドの名前を明示的に宣言します。詳細は、Aggregate パターンを参照してください。

strategyMethodAllowNull

false

デフォルトの動作では、補完するデータがない場合、集約メソッドは使用されません。このオプションが true の場合、補完のデータがなく、POJO を AggregationStrategy に使用していると、null 値は oldExchange として使用されます。詳細は、Aggregate パターンを参照してください。

timeout

-1

外部サービスからのポーリング時に応答を待つ最大時間 (ミリ秒単位)。デフォルトの動作では、pollEnrich() メソッドが receive() メソッドを呼び出します。メッセージが利用可能になるまで receive() はブロックできるので、常にタイムアウトを指定することが推奨されます。

aggregateOnException

false

デフォルトの動作では、リソースから補完するデータの取得中に例外が発生すると、集約メソッドは 使用されません。このオプションを true に設定すると、aggregate メソッドに例外がある場合に、エンドユーザーがアクションを制御できるようになります。たとえば、例外を非表示にしたり、カスタムメッセージのボディーを設定したりすることができます。

cacheSize

1000

このオプションを指定し、pollEnrich() 操作で再利用するためにコンシューマーをキャッシュする ConsumerCache のキャッシュサイズを設定します。このキャッシュを無効にするには、cacheSize オプションを -1 に設定します。

ignoreInvalidEndpoint

false

解決できないエンドポイント URI を無視するかどうかを示します。デフォルトの動作では、無効なエンドポイント URI を特定する例外が Camel によって出力されます。