270.4. Camel からのデータの取得

Camel ルートから流れるデータをサブスクライブするには、次のスニペットのように、エクスチェンジを名前付きストリームにリダイレクトする必要があります。

from("timer:clock")
.setBody().header(Exchange.TIMER_COUNTER)
.to("reactive-streams:numbers");

ルートは、XML DSL を使用して記述することもできます。

この例では、数字の無制限のストリームが名前 numbers に関連付けられています。ストリームには、CamelReactiveStreams ユーティリティークラスを使用してアクセスできます。

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

// Getting a stream of exchanges
Publisher<Exchange> exchanges = camel.fromStream("numbers");

// Getting a stream of Integers (using Camel standard conversion system)
Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);

ストリームは、リアクティブストリームと互換性のあるライブラリーで簡単に使用できます。これを RxJava 2 で使用する方法の例を次に示します (ただし、イベントの処理には任意のリアクティブフレームワークを使用できます)。

Flowable.fromPublisher(integers)
    .doOnNext(System.out::println)
    .subscribe();

この例では、Camel によって生成されたすべての数値を System.out に出力します。

270.4.1. ダイレクト API を使用した Camel からのデータの取得

短い Camel ルートの場合、および (Camel DSL をまったく使用せずに) リアクティブフレームワークの関数構造を使用して処理フロー全体を定義することを好むユーザーの場合、Camel URI を使用してストリームを定義することもできます。

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

// Get a stream from all the files in a directory
Publisher<String> files = camel.from("file:folder", String.class);

// Use the stream in RxJava2
Flowable.fromPublisher(files)
    .doOnNext(System.out::println)
    .subscribe();