270.4. Camel からのデータの取得
Camel ルートから流れるデータをサブスクライブするには、次のスニペットのように、エクスチェンジを名前付きストリームにリダイレクトする必要があります。
from("timer:clock") .setBody().header(Exchange.TIMER_COUNTER) .to("reactive-streams:numbers");
ルートは、XML DSL を使用して記述することもできます。
この例では、数字の無制限のストリームが名前 numbers
に関連付けられています。ストリームには、CamelReactiveStreams
ユーティリティークラスを使用してアクセスできます。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Getting a stream of exchanges Publisher<Exchange> exchanges = camel.fromStream("numbers"); // Getting a stream of Integers (using Camel standard conversion system) Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);
ストリームは、リアクティブストリームと互換性のあるライブラリーで簡単に使用できます。これを RxJava 2 で使用する方法の例を次に示します (ただし、イベントの処理には任意のリアクティブフレームワークを使用できます)。
Flowable.fromPublisher(integers) .doOnNext(System.out::println) .subscribe();
この例では、Camel によって生成されたすべての数値を System.out
に出力します。
270.4.1. ダイレクト API を使用した Camel からのデータの取得
短い Camel ルートの場合、および (Camel DSL をまったく使用せずに) リアクティブフレームワークの関数構造を使用して処理フロー全体を定義することを好むユーザーの場合、Camel URI を使用してストリームを定義することもできます。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Get a stream from all the files in a directory Publisher<String> files = camel.from("file:folder", String.class); // Use the stream in RxJava2 Flowable.fromPublisher(files) .doOnNext(System.out::println) .subscribe();