第38章 コンポーネントの実装

概要

本章では、Apache Camel コンポーネントを実装するための概要を示します。

38.1. コンポーネントのアーキテクチャー

38.1.1. コンポーネントのファクトリーパターン

概要

Apache Camel コンポーネントは、ファクトリーパターンを介して相互に関連するクラスセットで設定されます。主なエントリーポイントは Component コンポーネントオブジェクト自体です (org.apache.camel.Component タイプのインスタンス)。Coponnet オブジェクトは、Endpoint オブジェクトを作成するためのファクトリーとして使用することができます。エンドポイントオブジェクトは、ConsumerProducer、および Exchange オブジェクトを作成するためのファクトリーとして機能します。これらの関係の概要は、図38.1「コンポーネントファクトリーパターン」 にまとめられています。

図38.1 コンポーネントファクトリーパターン

コンポーネントファクトリーパターン

コンポーネント

コンポーネント実装はエンドポイントファクトリーです。コンポーネント実装の主なタスクは、Component.createEndpoint() メソッドを実装することです。このメソッドは、オンデマンドで新しいエンドポイントを作成します。

各種類のコンポーネントは、エンドポイント URI に表示される コンポーネント接頭辞 に関連付ける必要があります。たとえば、ファイルコンポーネントは、通常 file://tmp/messages/input などのエンドポイント URI で使用できる file 接頭辞に関連付けられます。Apache Camel に新しいコンポーネントをインストールする場合、特定のコンポーネント接頭辞とコンポーネントを実装するクラス名の関連付けを定義する必要があります。

エンドポイント

各エンドポイントインスタンスは特定のエンドポイント URI をカプセル化します。Apache Camel が新しいエンドポイント URI に遭遇するたびに、新しいエンドポイントインスタンスが作成されます。エンドポイントオブジェクトは、コンシューマーエンドポイントおよびプロデューサーエンドポイントを作成するためのファクトリーでもあります。

エンドポイントは org.apache.camel.Endpoint インターフェイスを実装する必要があります。Endpoint インターフェイスは、以下のファクトリーメソッドを定義します。

  • createConsumer() および createPollingConsumer() コンシューマーエンドポイントを作成します。コンシューマーエンドポイントは、ルートの最初のソースエンドポイントを表します。
  • createProducer(): ルートの最後にターゲットエンドポイントを表すプロデューサーエンドポイントを作成します。
  • createExchange(): エクスチェンジオブジェクトを作成します。これにより、ルート上のメッセージをカプセル化します。

コンシューマー

コンシューマーエンドポイントはリクエストを 消費 します。これらはルートの先頭に現れ、受信したリクエストおよび応答のディスパッチを行うコードをカプセル化します。サービス指向の考え方から、コンシューマーが サービス を表します。

コンシューマーは org.apache.camel.Consumer インターフェイスを実装する必要があります。コンシューマーの実装時には、フォローできるさまざまなパターンがあります。これらのパターンは、「コンシューマーパターンおよびスレッド」 で説明されています。

プロデューサー

プロデューサーエンドポイントはリクエストを 生成 します。これらはルートの最後に常に表示され、リクエスト送信のディスパッチおよび応答を受信するコードをカプセル化します。サービス指向の考え方から、プロデューサーはサービスコンシューマーを表します。

プロデューサーは、org.apache.camel.Producer インターフェイスを実装する必要があります。オプションでプロデューサーを実装し、非同期処理をサポートすることができます。詳細は、「非同期処理」 を参照してください。

エクスチェンジ

エクスチェンジオブジェクトは関連するメッセージセットをカプセル化します。たとえば、メッセージエクスチェンジの 1 つは、要求メッセージとその関連する応答で設定される同期呼び出しです。

エクスチェンジは org.apache.camel.Exchange インターフェイスを実装する必要があります。デフォルトの実装である DefaultExchange は、多くのコンポーネントの実装には十分です。ただし、エクスチェンジと追加のデータを関連付ける場合や、エクスチェンジに追加の処理を行う場合、エクスチェンジの実装をカスタマイズすると便利です。

メッセージ

Exchange オブジェクトには、2 つの異なるメッセージスロットがあります。

  • in メッセージ: 現在のメッセージを保持します。
  • out メッセージ: リプライメッセージを一時的に保持します。

すべてのメッセージタイプは同じ Java オブジェクト org.apache.camel.Message によって表されます。デフォルト実装のメッセージ DefaultMessage は通常、カスタマイズする必要はありません。

38.1.2. ルートでのコンポーネントの使用

概要

Apache Camel ルートは基本的に org.apache.camel.Processor タイプのプロセッサーのパイプラインです。メッセージは、process() メソッドを呼び出してノードからノードに渡されるエクスチェンジオブジェクト process() でカプセル化されます。プロセッサーパイプラインのアーキテクチャーは、図38.2「ルートのコンシューマーおよびプロデューサーインスタンス」 に示されています。

図38.2 ルートのコンシューマーおよびプロデューサーインスタンス

ルートのコンシューマーおよびプロデューサーインスタンス

ソースエンドポイント

ルートの最初には、org.apache.camel.Consumer オブジェクトで表現されるソースエンドポイントがあります。ソースエンドポイントは、受信したリクエストメッセージを受け入れ、応答をディスパッチします。ルートを構築する際、Apache Camel は、「コンポーネントのファクトリーパターン」 で説明されているように、エンドポイント URI のコンポーネント接頭辞に基づいて、適切な Consumer タイプを作成します。

プロセッサー

パイプラインの各中間ノードは、プロセッサーオブジェクトによって表されます (org.apache.camel.Processor インターフェイスの実装) 。標準のプロセッサー (たとえば filterthrottler、または delayer) を挿入したり、独自のカスタムプロセッサー実装を挿入したりできます。

ターゲットエンドポイント

ルートの最後には、org.apache.camel.Producer オブジェクトで表現されるターゲットエンドポイントがあります。プロセッサーパイプラインの最後にあるため、プロデューサーはプロセッサーオブジェクトでもあります (org.apache.camel.Processor インターフェイスを実装します)。ターゲットエンドポイントは、リクエストメッセージを送信し、応答を受信します。ルートを作成するとき、Apache Camel はエンドポイント URI からコンポーネント接頭辞に基づいて適切な Producer タイプを作成します。

38.1.3. コンシューマーパターンおよびスレッド

概要

コンシューマーの実装に使用されるパターンは、受信エクスチェンジの処理に使用されるスレッドモデルを決定します。コンシューマーは、以下のパターンのいずれかを使用して実装できます。

イベント駆動型のパターン

イベント駆動型のパターンでは、アプリケーションの別の部分 (通常はサードパーティーライブラリー) がコンシューマーによって実装されたメソッドを呼び出すと、受信したリクエストの処理が開始されます。イベント駆動型のコンシューマーの適切な例として、イベントが JMX ライブラリーによって開始される Apache Camel JMX コンポーネントがあります。JMX ライブラリーは、handleNotification() メソッドを呼び出してリクエスト処理を開始します。詳細は 例41.4「JMXConsumer 実装」 を参照してください。

図38.3「イベント駆動型コンシューマー」 イベント駆動型のコンシューマーパターンの概要を示します。この例では、notify() メソッドへの呼び出しによって処理がトリガーされることを前提としています。

図38.3 イベント駆動型コンシューマー

イベント駆動型のコンシューマーを使用したメッセージチェーン

イベント駆動型のコンシューマーは、以下のように受信リクエストを処理します。

  1. コンシューマーは受信イベントを受信するメソッドを実装する必要があります (図38.3「イベント駆動型コンシューマー」 では notify() メソッドで表されます)。通常、notify() を呼び出すスレッドはアプリケーションの別の部分であるため、コンシューマーのスレッドポリシーは外部で実行されます。

    たとえば、JMX コンシューマーの実装では、コンシューマーは JMX から通知を受け取る NotificationListener.handleNotification() メソッドを実装します。コンシューマー処理を駆動するスレッドは JMX レイヤー内に作成されます。

  2. notify() メソッドの本文では、コンシューマーが最初に受信イベントをエクスチェンジオブジェクト E に変換し、ルートの次のプロセッサーを process() で呼び出しを行い、エクスチェンジオブジェクトを引数として渡します。

スケジュールされたポーリングパターン

スケジュールされたポーリングパターンでは、リクエストが到達したかどうかを定期的にチェックして、コンシューマーは受信リクエストを取得します。リクエストの確認は、java.util.concurrent ライブラリーによって提供される標準パターンで スケジュールされたエグゼキューターサービス である組み込みタイマークラスによって自動的にスケジュールされます。スケジュールされたエグゼキューターサービスは、特定のタスクを時間間隔で実行し、タスクインスタンスの実行に使用されるスレッドのプールも管理します。

図38.4「スケジュールされたポーリングコンシューマー」 は、スケジュールされたポーリングコンシューマーパターンの概要を示しています。

図38.4 スケジュールされたポーリングコンシューマー

スケジュールされたポーリングコンシューマー

スケジュールされたポーリングコンシューマーは、以下のようにリクエストを処理します。

  1. スケジュールされたエグゼキューターサービスには、利用できるスレッドプールがあり、コンシューマーの処理を開始できます。スケジュール設定した時間間隔が経過すると、スケジュール済みエグゼキューターサービスはプールから空きスレッドの取得を試みます (デフォルトではプールには 5 つのスレッドがあります)。空きスレッドが利用可能な場合、そのスレッドを使用してコンシューマーで poll() メソッドを呼び出します。
  2. コンシューマーの poll() メソッドは、受信したリクエストの処理をトリガーすることを目的としています。poll() メソッドの本文では、コンシューマーは受信メッセージの取得を試行します。リクエストがない場合、poll() メソッドを即座に返します。
  3. 要求メッセージが利用可能な場合、コンシューマーはこれをエクスチェンジオブジェクトに挿入し、ルート内の次のプロセッサーで process() の呼び出しを行い、エクスチェンジオブジェクトを引数として渡します。

ポーリングパターン

ポーリングパターンでは、サードパーティーがコンシューマーのポーリングメソッドの 1 つを呼び出すと、受信したリクエストの処理が開始されます。

  • receive()
  • receiveNoWait()
  • receive(long timeout)

ポーリングメソッドの呼び出しを開始するための正確なメカニズムを定義することは、コンポーネントの実装の責務です。このメカニズムはポーリングパターンで指定されません。

図38.5「Polling Consumer」 は、ポーリングコンシューマーパターンの概要を示しています。

図38.5 Polling Consumer

Polling Consumer

ポーリングを行うコンシューマーは、以下のようにリクエストを処理します。

  1. 受信リクエストの処理は、コンシューマーのポーリングメソッドの 1 つが呼び出されるたびに開始されます。これらのポーリングメソッドを呼び出すメカニズムは、コンポーネントの実装によって定義されます。
  2. receive() メソッドのボディーでは、コンシューマーは受信したリクエストメッセージの取得を試みます。現在利用できるメッセージがない場合、動作は、呼び出された受信メソッドによって異なります。

    • receiveNoWait() が即座に返されます。
    • receive(long timeout) は、指定されたタイムアウト間隔で待機します。[2] 返す前
    • receive() は、メッセージが受信されるまで待機します。
  3. 要求メッセージが利用可能な場合、コンシューマーはこれをエクスチェンジオブジェクトに挿入し、ルート内の次のプロセッサーで process() の呼び出しを行い、エクスチェンジオブジェクトを引数として渡します。

38.1.4. 非同期処理

概要

プロデューサーエンドポイントは通常、エクスチェンジの処理時に 同期 パターンに従います。上記のプロセッサーがパイプラインでプロデューサーの process() を呼び出すと、応答を受け取るまで process() メソッドはブロックされます。この場合、プロセッサーのスレッドは、リクエストの送信および応答の受信サイクルが完了するまでブロックされます。

ただし、プロセッサーのスレッドがすぐにリリースされ、process() 呼び出しがブロックされ ない ようにするため、前のプロセッサーをプロデューサーから切り離す方が望ましい場合があります。この場合、asynchronous パターンを使用してプロデューサーを実装する必要があります。これにより、前のプロセッサーでは、非ブロッキングバージョンの process() メソッドを呼び出すオプションが提供されます。

異なる実装オプションの概要を示すために、本セクションでは、プロデューサーエンドポイントを実装する同期パターンと非同期パターンの両方を説明します。

同期プロデューサー

図38.6「同期プロデューサー」 は、プロデューサーがエクスチェンジの処理を完了するまで、前のプロセッサーがブロックする同期プロデューサーの概要を示しています。

図38.6 同期プロデューサー

同期プロデューサー

同期プロデューサーは以下のようにエクスチェンジを処理します。

  1. パイプラインの前のプロセッサーは、プロデューサー上の同期 process() メソッドを呼び出して、同期処理を開始します。同期 process() メソッドは単一のエクスチェンジ引数を取ります。
  2. process() メソッドのボディー部で、プロデューサーはリクエスト (In メッセージ) をエンドポイントに送信します。
  3. 交換パターンで必要な場合、プロデューサーは応答 (Out メッセージ) がエンドポイントから到達するまで待機します。このステップにより、process() メソッドが無限にブロックされる可能性があります。ただし、交換パターンが応答を強制しない場合、process() メソッドはリクエストの送信直後に返すことができます。
  4. process() メソッドが返されると、エクスチェンジオブジェクトには同期呼び出しからの応答が含まれます (Out メッセージメッセージ)。

非同期プロデューサー

図38.7「非同期プロデューサー」 は、プロデューサーがサブスレッドでのエクスチェンジを処理し、前のプロセッサーは長時間ブロックされない非同期プロデューサーの概要を示しています。

図38.7 非同期プロデューサー

非同期プロデューサー

非同期プロデューサーは以下のようにエクスチェンジを処理します。

  1. プロセッサーが非同期 process() メソッドを呼び出す前に、非同期コールバック オブジェクトを作成する必要があります。これはルートの戻り部分でエクスチェンジを処理するロールを果たします。非同期コールバックでは、プロセッサーは AsyncCallback インターフェイスから継承されるクラスを実装する必要があります。
  2. プロセッサーは、プロデューサー上の非同期 process() メソッドを呼び出して非同期処理を開始します。非同期 process() メソッドは、2 つの引数を取ります。

    • エクスチェンジオブジェクト
    • 同期コールバックオブジェクト
  3. process() メソッドのボディー部で、プロデューサーは処理コードをカプセル化する Runnable オブジェクトを作成します。その後、プロデューサーはこの Runnable オブジェクトの実行をサブスレッドに委任します。
  4. 非同期 process() メソッドが返されるため、プロセッサーのスレッドが解放されますエクスチェンジの処理は個別のサブスレッドで続行されます。
  5. Runnable オブジェクトは In メッセージをエンドポイントに送信します。
  6. 交換パターンで必要な場合、Runnable オブジェクトはエンドポイントから応答 (Out または Fault メッセージ) が到達するのを待機します。Runnable オブジェクトは応答を受け取るまでブロックされます。
  7. 応答が到達すると、Runnable オブジェクトは応答 (Out メッセージ) をエクスチェンジオブジェクトに挿入し、非同期コールバックオブジェクトの done() を呼び出します。次に、非同期コールバックはリプライメッセージを処理します (サブスレッドで実行されます)。


[2] 通常、タイムアウトの間隔はミリ秒単位で指定します。