1.5. プロセッサー

概要

ルーターが単にコンシューマーエンドポイントをプロデューサーエンドポイントに接続するだけでなく、より複雑なことを実行できるようにするため、プロセッサー をルートに追加することができます。プロセッサーは、ルーティングルールに挿入して、ルールを通過するメッセージの任意処理を実行するコマンドです。Apache Camel は、表1.1「Apache Camel プロセッサー」 に示されるように、さまざまなプロセッサーを提供します。

表1.1 Apache Camel プロセッサー

Java DSLXML DSL説明

aggregate()

aggregate

「Aggregator」: 複数の受信エクスチェンジを単一のエクスチェンジに組み合わせるアグリゲーターを作成します。

aop()

aop

アスペクト指向プログラミング (AOP) を使用して、指定されたサブルートの前後で作業を行います。

bean(), beanRef()

bean

Java オブジェクト (または Bean) でメソッドを呼び出して、現在のエクスチェンジを処理します。「Bean インテグレーション」を参照してください。

choice()

choice

「Content-Based Router」: when および otherwise 句を使い、エクスチェンジの内容に基づいて特定のサブルートを選択します。

convertBodyTo()

convertBodyTo

In メッセージボディーを、指定された型に変換します。

delay()

delay

「Delayer」: ルートの後続へエクスチェンジを伝搬するのを遅延します。

doTry()

doTry

doCatchdoFinally、および end 句を使い、例外を処理するための try/catch ブロックを作成します。

end()

該当なし

現在のコマンドブロックを終了します。

enrich(),enrichRef()

enrich

「Content Enricher」: 現在のエクスチェンジと、指定された プロデューサー エンドポイント URI からリクエストされたデータを統合します。

filter()

filter

「Message Filter」: 述語式を使用して受信エクスチェンジをフィルタリングします。

idempotentConsumer()

idempotentConsumer

「Idempotent Consumer」: 重複メッセージを抑制するストラテジーを実装します。

inheritErrorHandler()

@inheritErrorHandler

特定のルートノードで継承されたエラーハンドラーを無効にするために使用できるブール値オプション (Java DSL でサブ句として定義され、XML DSL の属性として定義) 。

inOnly()

inOnly

現在のエクスチェンジの MEP を InOnly (引数がない場合) に設定するか、指定されたエンドポイントへエクスチェンジを InOnly として送信します。

inOut()

inOut

現在のエクスチェンジの MEP を InOut (引数がない場合) に設定するか、指定されたエンドポイントへエクスチェンジを InOut として送信します。

loadBalance()

loadBalance

「Load Balancer」: エンドポイントのコレクションに対する負荷分散を実装します。

log()

log

コンソールにメッセージを記録します。

loop()

loop

「Loop」: 各エクスチェンジをルートの後続に繰り返し再送信します。

markRollbackOnly()

@markRollbackOnly

(トランザクション) 現在のトランザクションをロールバックオンリーにマークします (例外は発生しません)。XML DSL では、このオプションは rollback 要素にブール値属性として設定されます。Apache Karaf トランザクションガイド を参照してください。

markRollbackOnlyLast()

@markRollbackOnlyLast

(トランザクション) 1 つ以上のトランザクションがこのスレッドに関連付けられてから一時停止されている場合、このコマンドは最新のトランザクションをロールバックオンリーにマークします (例外は発生しません)。XML DSL では、このオプションは rollback 要素にブール値属性として設定されます。Apache Karaf トランザクションガイド を参照してください。

marshal()

marshal

指定されたデータフォーマットを使用して低レベルまたはバイナリーフォーマットに変換し、特定のトランスポートプロトコルで送信できるようにします。

multicast()

multicast

「Multicast」: 現在のエクスチェンジを複数の宛先にマルチキャストし、各宛先がエクスチェンジの独自のコピーを取得します。

onCompletion()

onCompletion

メインルートの完了後に実行されるサブルート (Java DSL end() で終了) を定義します。「OnCompletion」も併せて参照してください。

onException()

onException

指定された例外が発生するたびに実行されるサブルート (Java DSL end() で終了) を定義します。通常、ルート内ではなく、専用の行で定義されます。

pipeline()

パイプライン

「パイプとフィルター」: あるエンドポイントの出力が次のエンドポイントの入力となるように、一連のエンドポイントにエクスチェンジを送ります。「パイプライン処理」も併せて参照してください。

policy()

policy

現在のルートにポリシーを適用します (現時点ではトランザクションポリシーにのみ使用されます)。Apache Karaf Transaction Guide を参照してください。

pollEnrich(),pollEnrichRef()

pollEnrich

「Content Enricher」: 現在のエクスチェンジと、指定された コンシューマー エンドポイント URI からポーリングされたデータを統合します。

process(),processRef

process

現在のエクスチェンジでカスタムプロセッサーを実行します。「カスタムプロセッサー」 および パートIII「高度な Camel プログラミング」 を参照してください。

recipientList()

recipientList

「受信者リスト」: エクスチェンジを、実行時に算出される (たとえば、ヘッダーの内容に基づいて) 受信者のリストに送信します。

removeHeader()

removeHeader

指定したヘッダーをエクスチェンジの In メッセージから削除します。

removeHeaders()

removeHeaders

指定したパターンに一致するヘッダーをエクスチェンジの In メッセージから削除します。パターンにはフォームを持たせることができます。prefix\* を使用した場合、接頭辞で始まるすべての名前と一致します。それ以外の場合、正規表現として解釈されます。

removeProperty()

removeProperty

エクスチェンジから、指定したエクスチェンジプロパティーを削除します。

removeProperties()

removeProperties

指定したパターンに一致するプロパティーをエクスチェンジから削除します。コンマで区切られた 1 つ以上の文字列のリストを引数として取ります。最初の文字列はパターンです (上記の removeHeaders() を参照)。後続の文字列は例外を指定します。これらのプロパティーは削除されません。

resequence()

resequence

「Resequencer」: 指定されたコンパレータの動作に基づき、受信エクスチェンジの順序を変更します。バッチ モードと ストリーム モードをサポートします。

rollback()

rollback

(トランザクション) 現在のトランザクションをロールバックオンリーにマークします (デフォルトでは例外も発生)。Apache Karaf トランザクションガイド を参照してください。

routingSlip()

routingSlip

「Routing Slip」: 任意のヘッダーから抽出したエンドポイント URI のリストに基づいて動的に構築されたパイプラインで、エクスチェンジをルーティングします。

sample()

sample

サンプリングスロットラーを作成し、ルート上のトラフィックからエクスチェンジのサンプルを抽出できるようにします。

setBody()

setBody

エクスチェンジの In メッセージのメッセージボディーを設定します。

setExchangePattern()

setExchangePattern

現在のエクスチェンジの MEP を指定された値に設定します。「メッセージ交換パターン」を参照してください。

setHeader()

setHeader

エクスチェンジの In メッセージに指定したヘッダーを設定します。

setOutHeader()

setOutHeader

エクスチェンジの Out メッセージに指定したヘッダーを設定します。

setProperty()

setProperty()

指定したエクスチェンジプロパティーを設定します。

sort()

sort

In メッセージボディーの内容を並べ替えます (カスタムコンパレーターをオプションで指定できます) 。

split()

split

「Splitter」: 現在のエクスチェンジを一連のエクスチェンジに分割します。分割された各エクスチェンジには元のメッセージボディーの断片が含まれます。

stop()

stop

現在のエクスチェンジのルーティングを停止し、完了したとマークします。

threads()

threads

ルートの後続部分を並列に処理するためにスレッドプールを作成します。

throttle()

throttle

「Throttler」: フローレートを指定レベルに制限します (1 秒あたりのエクスチェンジ数) 。

throwException()

throwException

指定された Java 例外を出力します。

to()

上記を以下のように変更します。

エクスチェンジを 1 つ以上のエンドポイントに送信します。「パイプライン処理」を参照してください。

toF()

該当なし

文字列フォーマットを使用して、エクスチェンジをエンドポイントに送信します。つまり、エンドポイント URI 文字列は C 言語の printf() 関数の形式に置換し、埋め込むことができます。

transacted()

transacted

ルートの後続部分を囲む Spring トランザクションスコープを作成します。Apache Karaf トランザクションガイド を参照してください。

transform()

transform

「メッセージトランスレーター」: In メッセージヘッダーを Out メッセージヘッダーにコピーし、Out メッセージボディーを指定された値に設定します。

unmarshal()

unmarshal

指定したデータフォーマットを使用して、In メッセージボディーを低レベルまたはバイナリー形式から高レベル形式に変換します。

validate()

validate

述語式を取り、現在のメッセージが有効かどうかを検証します。述語が false を返す場合は、PredicateValidationException 例外を出力します。

wireTap()

wireTap

「Wire Tap」: ExchangePattern.InOnly MEP を使用して、現在のエクスチェンジのコピーを指定された Wire tap URI に送信します。

サンプルプロセッサー

ルートでプロセッサーを使用する方法をある程度理解するには、以下の例を参照してください。

Choice

choice() プロセッサーは、受信メッセージを別のプロデューサーエンドポイントにルーティングするために使用される条件文です。代替となる各プロデューサーエンドポイントの前には、述語の引数を取る when() メソッドがあります。述語が true の場合、後続のターゲットが選択されます。そうでない場合、ルール内の次の when() メソッドに処理が進みます。たとえば、以下の choice() プロセッサーは Predicate1 および Predicate2 の値に応じて、受信メッセージを Target1Target2、または Target3 のいずれかに転送します。

from("SourceURL")
    .choice()
        .when(Predicate1).to("Target1")
        .when(Predicate2).to("Target2")
        .otherwise().to("Target3");

または、Spring XML で同等のものを記述すると、このようになります。

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <choice>
      <when>
        <!-- First predicate -->
        <simple>header.foo = 'bar'</simple>
        <to uri="Target1"/>
      </when>
      <when>
        <!-- Second predicate -->
        <simple>header.foo = 'manchu'</simple>
        <to uri="Target2"/>
      </when>
      <otherwise>
        <to uri="Target3"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

Java DSL には、endChoice() コマンドを使用する必要がある可能性がある特殊なケースがあります。標準の Apache Camel プロセッサーの中には、特殊なサブ句を使用して追加のパラメーターを指定でき、通常は end() コマンドで終了する追加レベルのネストを効果的に展開します。たとえば、ロードバランサー句を loadBalance().roundRobin().to("mock:foo").to("mock:bar").end() として指定できます。これにより、mock:foomock:bar エンドポイント間でメッセージの負荷分散が行われます。ただし、ロードバランサー句が choice の条件に組み込まれている場合は、以下のように endChoice() コマンドを使用して句を終了する必要があります。

from("direct:start")
    .choice()
        .when(bodyAs(String.class).contains("Camel"))
            .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice()
        .otherwise()
            .to("mock:result");

フィルター

filter() プロセッサーを使用すると、必要のないメッセージがプロデューサーエンドポイントに到達しないようにすることができます。述語の引数を 1 つ取ります。述語が true の場合、メッセージエクスチェンジはプロデューサーに対して許可されます。述語が false の場合、メッセージエクスチェンジはブロックされます。たとえば、以下のフィルターは、受信メッセージに bar の値を持つヘッダー foo が含まれない限り、メッセージエクスチェンジをブロックします。

from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");

または、Spring XML で同等のものを記述すると、このようになります。

<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <filter>
      <simple>header.foo = 'bar'</simple>
      <to uri="TargetURL"/>
    </filter>
  </route>
</camelContext>

Throttler

throttle() プロセッサーは、プロデューサーエンドポイントがオーバーロードしないようにします。スロットラーは、1 秒間に通過できるメッセージの数を制限することで機能します。受信メッセージが指定されたレートを超える場合、スロットラーは超過したメッセージをバッファーに蓄積し、ゆっくりとプロデューサーエンドポイントに送信します。たとえば、スループットのレートを毎秒 100 メッセージに制限するには、以下のルールを定義します。

from("SourceURL").throttle(100).to("TargetURL");

または、Spring XML で同等のものを記述すると、このようになります。

<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="SourceURL"/>
    <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000">
      <to uri="TargetURL"/>
    </throttle>
  </route>
</camelContext>

カスタムプロセッサー

ここに記載されている標準プロセッサーがいずれも必要な機能を提供しない場合は、いつでも独自のカスタムプロセッサーを定義できます。カスタムプロセッサーを作成するには、org.apache.camel.Processor インターフェイスを実装し、process() メソッドを上書きするクラスを定義します。以下のカスタムプロセッサー MyProcessor は、受信メッセージから、ヘッダー foo を削除します。

例1.3 カスタムプロセッサークラスの実装

public class MyProcessor implements org.apache.camel.Processor {
public void process(org.apache.camel.Exchange exchange) {
  inMessage = exchange.getIn();
  if (inMessage != null) {
      inMessage.removeHeader("foo");
  }
}
};

カスタムプロセッサーをルータールールに挿入するには、process() メソッドを呼び出します。このメソッドは、ルールにプロセッサーを挿入するための一般的なメカニズムを提供します。たとえば、以下のルールは、例1.3「カスタムプロセッサークラスの実装」 で定義されたプロセッサーを呼び出します。

org.apache.camel.Processor myProc = new MyProcessor();

from("SourceURL").process(myProc).to("TargetURL");