37.3. コンシューマーテンプレートの使用

概要

コンシューマーテンプレートは、受信メッセージを受信するためにコンシューマーエンドポイントをポーリングする方法を提供します。受信メッセージをエクスチェンジオブジェクトの形式またはメッセージボディーの形式で受信するかを選択できます (メッセージボディーは組み込み型コンバーターを使用して特定の型にキャストできます) 。

エクスチェンジのポーリングの例

コンシューマーテンプレートを使用し、ブロッキングの receive()、タイムアウトのある receive()、もしくは即座に返される receiveNoWait() ポーリングメソッドのいずれかを使用して、エクスチェンジのコンシューマーエンドポイントをポーリングできます。コンシューマーエンドポイントはサービスを表すため、エクスチェンジのポーリングを試みる前に、start() 呼び出しを行ってサービススレッドを開始する必要があります。

以下の例は、ブロッキング receive() メソッドを使用して seda:foo コンシューマーエンドポイントからエクスチェンジをポーリングする方法を示しています。

import org.apache.camel.ProducerTemplate;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
...
ProducerTemplate template = context.createProducerTemplate();
ConsumerTemplate consumer = context.createConsumerTemplate();

// Start the consumer service
consumer.start();
...
template.sendBody("seda:foo", "Hello");
Exchange out = consumer.receive("seda:foo");
...
// Stop the consumer service
consumer.stop();

コンシューマーテンプレートインスタンス consumer は、CamelContext.createConsumerTemplate() メソッドを使用してインスタンス化され、コンシューマーサービススレッドは ConsumerTemplate.start() 呼び出しによって開始されます。

メッセージボディーをポーリングする例

また、ブロッキング receiveBody()、タイムアウトのある receiveBody()、または即座に返される receiveBodyNoWait() メソッドのいずれかを使用して、受信メッセージボディーのコンシューマーエンドポイントをポーリングすることもできます。上記の例のように、エクスチェンジをポーリングする前に start() を呼び出してサービススレッドを起動する必要もあります。

以下の例は、ブロッキング receiveBody() メソッドを使用して seda:foo コンシューマーエンドポイントから受信メッセージボディーをポーリングする方法を示しています。

import org.apache.camel.ProducerTemplate;
import org.apache.camel.ConsumerTemplate;
...
ProducerTemplate template = context.createProducerTemplate();
ConsumerTemplate consumer = context.createConsumerTemplate();

// Start the consumer service
consumer.start();
...
template.sendBody("seda:foo", "Hello");
Object body = consumer.receiveBody("seda:foo");
...
// Stop the consumer service
consumer.stop();

エクスチェンジをポーリングする方法

コンシューマーエンドポイントから エクスチェンジ をポーリングするための基本的な方法には、タイムアウトブロックが無期限にない receive()、指定されたミリ秒の期間にタイムアウトブロックが設定された receive()、およびブロックされない receiveNoWait() の 3 つがあります。エンドポイント URI または Endpoint インスタンスとして、コンシューマーエンドポイントを指定できます。

Exchange receive(String endpointUri);
Exchange receive(String endpointUri, long timeout);
Exchange receiveNoWait(String endpointUri);

Exchange receive(Endpoint endpoint);
Exchange receive(Endpoint endpoint, long timeout);
Exchange receiveNoWait(Endpoint endpoint);

メッセージボディーのポーリング方法

コンシューマーエンドポイントから メッセージボディ をポーリングするための基本的な方法には、タイムアウトブロックが無期限にない receive()、指定されたミリ秒の期間にタイムアウトブロックが設定された receive()、およびブロックされない receiveNoWait() の 3 つがあります。エンドポイント URI または Endpoint インスタンスとして、コンシューマーエンドポイントを指定できます。さらに、これらのメソッドのテンプレート形式を呼び出すことで、組み込み型コンバーターを使用して、返されるボディーを特定のタイプ T に変換できます。

Object receiveBody(String endpointUri);
Object receiveBody(String endpointUri, long timeout);
Object receiveBodyNoWait(String endpointUri);

Object receiveBody(Endpoint endpoint);
Object receiveBody(Endpoint endpoint, long timeout);
Object receiveBodyNoWait(Endpoint endpoint);

<T> T receiveBody(String endpointUri, Class<T> type);
<T> T receiveBody(String endpointUri, long timeout, Class<T> type);
<T> T receiveBodyNoWait(String endpointUri, Class<T> type);

<T> T receiveBody(Endpoint endpoint, Class<T> type);
<T> T receiveBody(Endpoint endpoint, long timeout, Class<T> type);
<T> T receiveBodyNoWait(Endpoint endpoint, Class<T> type);