第41章 Consumer インターフェイス

概要

本章では、Apache Camel コンポーネントの実装における必須ステップである Consumer インターフェイスを実装する方法を説明します。

41.1. Consumer インターフェイス

概要

org.apache.camel.Consumer タイプのインスタンスは、ルートのソースエンドポイントを表します。コンシューマーの実装方法がいくつかあります (「コンシューマーパターンおよびスレッド」 を参照)。このレベルの柔軟性は継承階層 (図41.1「コンシューマー継承階層」 を参照) に反映されます。これには、コンシューマーを実装するための複数の異なるベースクラスが含まれます。

図41.1 コンシューマー継承階層

コンシューマー継承階層

コンシューマーパラメーターの注入

Apache Camel は、スケジュールされたポーリングパターン (「スケジュールされたポーリングパターン」 を参照) を使用しているコンシューマーの場合、パラメーターをコンシューマーインスタンスに注入するサポートを提供します。たとえば、custom 接頭辞で識別されるコンポーネントの以下のエンドポイント URI について考えてみましょう。

custom:destination?consumer.myConsumerParam

Apache Camel は、フォームのクエリーオプション consumer.\* を自動的に注入するためのサポートを提供します。consumer.myConsumerParam パラメーターには、以下のように Consumer 実装クラスで対応する setter メソッドと getter メソッドを定義する必要があります。

public class CustomConsumer extends ScheduledPollConsumer {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}

getter および setter メソッドは、通常の Java Bean の命名規則に従います (プロパティー名の最初の文字を大文字にすることも含む)。

Consumer 実装で Bean メソッドを定義する他に、必ず Endpoint.createConsumer() の実装で configureConsumer() メソッドを呼び出す必要があります (「スケジュールされたポーリングエンドポイントの実装」 を参照してください)。

例41.1「FileEndpoint createConsumer () 実装」 は、ファイルコンポーネントの FileEndpoint クラスから取得した createConsumer() メソッド実装の例を示しています。

例41.1 FileEndpoint createConsumer () 実装

...
public class FileEndpoint extends ScheduledPollEndpoint {
    ...
    public Consumer createConsumer(Processor processor) throws Exception {
        Consumer result = new FileConsumer(this, processor);
        configureConsumer(result);
        return result;
    }
    ...
    }

ランタイム時に、コンシューマーパラメーターの注入は以下のようになります。

  1. エンドポイントが作成されると、DefaultComponent.createEndpoint(String uri) のデフォルトの実装は URI を解析してコンシューマーパラメーターを抽出し、 ScheduledPollEndpoint.configureProperties() を呼び出してエンドポイントインスタンスに保存します。
  2. createConsumer() が呼び出されると、メソッド実装は configureConsumer() を呼び出し、コンシューマーパラメーターを注入します (例41.1「FileEndpoint createConsumer () 実装」 を参照)。
  3. configureConsumer() メソッドは Java のリフレクションを使用して、consumer. 接頭辞を削除にした後に関連するオプションと一致する名前を持つ setter メソッドを呼び出します。

スケジュールされたポーリングパラメーター

スケジュールされたポーリングパターンに続くコンシューマーは、表41.1「スケジュールされたポーリングパラメーター」 に記載されているコンシューマーパラメーターを自動的にサポートします (エンドポイント URI のクエリーオプションとして表示されます)。

表41.1 スケジュールされたポーリングパラメーター

名前デフォルト説明

initialDelay

1000

最初のポーリングの前の遅延 (ミリ秒単位)。

delay

500

useFixedDelay フラグの値によって異なります (時間単位はミリ秒)。

useFixedDelay

false

false の場合は delay パラメーターはポーリング期間として解釈されます。ポーリングは initialDelayinitialDelay+delayinitialDelay+2\*delay などで行われます。

true の場合は、delay パラメーターは前の実行から次の実行までに経過した時間として解釈されます。ポーリングは initialDelayinitialDelay+[ProcessingTime]+delay などで行われます。ProcessingTime は、現在のスレッドでエクスチェンジオブジェクトを処理するのにかかった時間です。

イベント駆動型のコンシューマーとポーリングコンシューマー間の変換

Apache Camel は、イベント駆動型のコンシューマーとポーリングコンシューマー間の変換に使用できる 2 つの特殊なコンシューマー実装を提供します。以下の変換クラスが提供されます。

  • org.apache.camel.impl.EventDrivenPollingConsumer: イベント駆動型コンシューマーをポーリングコンシューマーインスタンスに変換します。
  • org.apache.camel.impl.DefaultScheduledPollConsumer: ポーリングコンシューマーをイベント駆動型コンシューマーインスタンスに変換します。

実際には、これらのクラスを使用して Endpoint タイプの実装タスクを単純化します。Endpoint インターフェイスは、コンシューマーインスタンスを作成するための以下の 2 つのメソッドを定義します。

package org.apache.camel;

public interface Endpoint {
    ...
    Consumer createConsumer(Processor processor) throws Exception;
    PollingConsumer createPollingConsumer() throws Exception;
}

createConsumer() はイベント駆動型のコンシューマーを返し、createPollingConsumer() はポーリングコンシューマーを返します。これらのメソッドは 1 つのみ実装します。たとえば、コンシューマーのイベント駆動パターンに従っている場合は、単に例外を発生させる createConsumer() のメソッド実装を提供するために createPollingConsumer() メソッドを実装します。ただし、変換クラスを利用して Apache Camel はより有用なデフォルト実装を提供できます。

たとえば、イベント駆動のパターンに従ってコンシューマーを実装する場合は、DefaultEndpoint を拡張し、createConsumer() メソッドを実装してエンドポイントを実装します。createPollingConsumer() の実装は、以下のように定義される DefaultEndpoint から継承されます。

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}

EventDrivenPollingConsumer コンストラクターはイベント駆動のコンシューマーへの参照 this を取得し、効果的にラップし、ポーリングコンシューマーに変換します。変換を実装するには、EventDrivenPollingConsumer インスタンスは受信イベントをバッファーし、receive()receive(long timeout)、および receiveNoWait() メソッドを介してオンデマンドで利用できるようにします。

同様に、ポーリングパターンに従ってコンシューマーを実装する場合は、DefaultPollingEndpoint を拡張し、createPollingConsumer() メソッドを実装してエンドポイントを実装します。この場合、createConsumer() メソッドの実装は DefaultPollingEndpoint から継承され、デフォルトの実装は DefaultScheduledPollConsumer インスタンス (ポーリングコンシューマーをイベント駆動型のコンシューマーに変換) を返します。

ShutdownPrepared インターフェイス

コンシューマークラスはオプションで org.apache.camel.spi.ShutdownPrepared インターフェイスを実装できます。これにより、カスタムコンシューマーエンドポイントがシャットダウン通知を受け取ることができます。

例41.2「ShutdownPrepared インターフェイス」 は、ShutdownPrepared インターフェイスの定義を示しています。

例41.2 ShutdownPrepared インターフェイス

package org.apache.camel.spi;

public interface ShutdownPrepared {

    void prepareShutdown(boolean forced);

}

ShutdownPrepared インターフェイスは以下のメソッドを定義します。

prepareShutdown

以下のように、1 または 2 フェーズでコンシューマーエンドポイントをシャットダウンするための通知を受信します。

  1. 正常なシャットダウン: forced 引数に false の値がある場合。リソースを正常にクリーンアップしようとします。たとえば、スレッドを正常に停止することによりクリーンアップします。
  2. Forced shutdown- forced 引数には、値 true があります。これは、シャットダウンがタイムアウトしたことを意味するため、リソースをより積極的にクリーンアップする必要があります。これは、プロセスが終了する前にリソースをクリーンアップする最後の契機となります。

ShutdownAware インターフェイス

コンシューマークラスはオプションで org.apache.camel.spi.ShutdownAware インターフェイスを実装できます。このインターフェイスは、正常なシャットダウンメカニズムと対話し、コンシューマーがシャットダウンするための追加の時間を要求できるようにします。これは通常、内部キューに保留中のエクスチェンジを保存できる SEDA などのコンポーネントに必要です。通常、SEDA コンシューマーをシャットダウンする前にキューのすべてのエクスチェンジを処理します。

例41.3「ShutdownAware インターフェイス」 は、ShutdownAware インターフェイスの定義を示しています。

例41.3 ShutdownAware インターフェイス

// Java
package org.apache.camel.spi;

import org.apache.camel.ShutdownRunningTask;

public interface ShutdownAware extends ShutdownPrepared {

    boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);

    int getPendingExchangesSize();
}

ShutdownAware インターフェイスは以下のメソッドを定義します。

deferShutdown

コンシューマーのシャットダウンを遅延させる場合は、このメソッドから true を返します。shutdownRunningTask 引数は enum で、以下のいずれかの値を取ることができます。

  • ShutdownRunningTask.CompleteCurrentTaskOnly: コンシューマーのスレッドプールによって現在処理されているエクスチェンジの処理を終了しますが、それ以上のエクスチェンジの処理は試みません。
  • ShutdownRunningTask.CompleteAllTasks- 保留中のエクスチェンジすべてを処理します。たとえば、SEDA コンポーネントの場合、コンシューマーは受信キューからすべてのエクスチェンジを処理します。
getPendingExchangesSize
コンシューマーによって処理されるエクスチェンジの数を示します。値をゼロにすると、処理が完了し、コンシューマーをシャットダウンできます。

ShutdownAware メソッドを定義する方法の例は、例41.7「カスタムスレッド実装」 を参照してください。