272.5. Camel에서 데이터 가져오기
Camel 경로에서 데이터 흐름을 구독하려면 다음 스니펫과 같이 교환이 지정된 스트림으로 리디렉션되어야 합니다.
from("timer:clock") .setBody().header(Exchange.TIMER_COUNTER) .to("reactive-streams:numbers");
또한 경로는 XML DSL을 사용하여 작성할 수 있습니다.
이 예에서 바인딩되지 않은 숫자 스트림은 이름 번호와 연결 됩니다
. 이 스트림은 CamelReactiveStreams
유틸리티 클래스를 사용하여 액세스할 수 있습니다.
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Getting a stream of exchanges Publisher<Exchange> exchanges = camel.fromStream("numbers"); // Getting a stream of Integers (using Camel standard conversion system) Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);
스트림은 모든 반응형 스트림 호환 라이브러리와 쉽게 사용할 수 있습니다. 다음은 RxJava 2 와 함께 사용하는 방법의 예입니다(모든 반응 프레임워크를 사용하여 이벤트를 처리할 수 있음).
Flowable.fromPublisher(integers) .doOnNext(System.out::println) .subscribe();
이 예제에서는 Camel에서 생성한 모든 숫자를 System.out
로 출력합니다.
272.5.1. 직접 API를 사용하여 Camel에서 데이터 가져오기
Camel 경로 및 Camel DSL을 전혀 사용하지 않고 동적 프레임워크의 기능 구성을 사용하여 전체 처리 흐름을 정의하는 것을 선호하는 사용자의 경우 Camel URI를 사용하여 스트림을 정의할 수도 있습니다.
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Get a stream from all the files in a directory Publisher<String> files = camel.from("file:folder", String.class); // Use the stream in RxJava2 Flowable.fromPublisher(files) .doOnNext(System.out::println) .subscribe();