42.2. Producer インターフェイスの実装

プロデューサーを実装する代替方法

プロデューサーは以下のいずれかの方法で実装できます。

同期プロデューサーの実装方法

例42.4「DefaultProducer 実装」 は、同期プロデューサーの実装方法の概要を説明しています。この場合、Producer.process() の呼び出しは応答を受け取るまでブロックします。

例42.4 DefaultProducer 実装

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class CustomProducer extends DefaultProducer { 1

    public CustomProducer(Endpoint endpoint) { 2
        super(endpoint);
        // Perform other initialization tasks...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }
}
1
org.apache.camel.impl.DefaultProducer クラスを拡張して、カスタム同期プロデューサークラス CustomProducer を実装します。
2
親エンドポイントへの参照を取得するコンストラクターを実装します。
3
process() メソッド実装は、プロデューサーコードの中心となります。process() メソッドの実装は、実装するコンポーネントのタイプに完全に依存します。

概説では、process() メソッドは通常以下のように実装されます。

  • エクスチェンジに In メッセージが含まれ、指定した交換パターンと一致する場合は、In メッセージを指定のエンドポイントに送信します。
  • 交換パターンが Out メッセージの受信を予測する場合は、 Out メッセージが受信されるまで待ちます。これにより、通常、process() メソッドは長時間ブロックします。
  • 返信が受信されたら、エクスチェンジオブジェクトに返信を添付するために exchange.setOut() を呼び出します。応答に障害メッセージが含まれる場合は、Out を使用して Message.setFault(true) メッセージに fault フラグを設定します。

非同期プロデューサーの実装方法

例42.5「CollectionProducer 実装」 に非同期プロデューサーを実装する方法を概説します。この場合、同期の process() メソッドと非同期の process() メソッド (追加の AsyncCallback 引数を取る) の両方を実装する必要があります。

例42.5 CollectionProducer 実装

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultProducer;

public class _CustomProducer_ extends DefaultProducer implements AsyncProcessor { 1

    public _CustomProducer_(Endpoint endpoint) { 2
        super(endpoint);
        // ...
    }

    public void process(Exchange exchange) throws Exception { 3
        // Process exchange synchronously.
        // ...
    }

    public boolean process(Exchange exchange, AsyncCallback callback) { 4
        // Process exchange asynchronously.
        CustomProducerTask task = new CustomProducerTask(exchange, callback);
        // Process 'task' in a separate thread...
        // ...
        return false; 5
    }
}

public class CustomProducerTask implements Runnable { 6
    private Exchange exchange;
    private AsyncCallback callback;

    public CustomProducerTask(Exchange exchange, AsyncCallback callback) {
        this.exchange = exchange;
        this.callback = callback;
    }

    public void run() { 7
        // Process exchange.
        // ...
        callback.done(false);
    }
}
1
org.apache.camel.impl.DefaultProducer クラスを拡張し、AsyncProcessor インターフェイスを実装してカスタム非同期プロデューサークラス CustomProducer を実装します。
2
親エンドポイントへの参照を取得するコンストラクターを実装します。
3
同期 process() メソッドを実装します。
4
非同期 process() メソッドを実装します。非同期メソッドは複数の方法で実装できます。ここでは、サブスレッドで実行されるコードを表す java.lang.Runnable インスタンスである task を作成する方法を示します。次に、Java スレッド API を使用してサブスレッドでタスクを実行します。たとえば、新しいスレッドを作成したり、既存のスレッドプールにタスクを割り当てたりして、これを行います。
5
通常、エクスチェンジが非同期的に処理されたことを示すために非同期 process() メソッドから false が返されます。
6
CustomProducerTask クラスは、サブスレッドで実行される処理コードをカプセル化します。このクラスは、Exchange オブジェクト (exchange) および AsyncCallback オブジェクト (callback) のコピーをプライベートメンバー変数として保存する必要があります。
7
run() メソッドには、In メッセージをプロデューサーエンドポイントに送信し (ある場合)、応答を受信するのを待つコードが含まれます。応答 (Out メッセージまたは Fault メッセージ) を受信し、エクスチェンジオブジェクトに挿入した後、呼び出し元に処理の完了を通知するために callback.done() を呼び出す必要があります。