272.5. Camel에서 데이터 가져오기

Camel 경로에서 데이터 흐름을 구독하려면 다음 스니펫과 같이 교환이 지정된 스트림으로 리디렉션되어야 합니다.

from("timer:clock")
.setBody().header(Exchange.TIMER_COUNTER)
.to("reactive-streams:numbers");

또한 경로는 XML DSL을 사용하여 작성할 수 있습니다.

이 예에서 바인딩되지 않은 숫자 스트림은 이름 번호와 연결 됩니다. 이 스트림은 CamelReactiveStreams 유틸리티 클래스를 사용하여 액세스할 수 있습니다.

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

// Getting a stream of exchanges
Publisher<Exchange> exchanges = camel.fromStream("numbers");

// Getting a stream of Integers (using Camel standard conversion system)
Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);

스트림은 모든 반응형 스트림 호환 라이브러리와 쉽게 사용할 수 있습니다. 다음은 RxJava 2 와 함께 사용하는 방법의 예입니다(모든 반응 프레임워크를 사용하여 이벤트를 처리할 수 있음).

Flowable.fromPublisher(integers)
    .doOnNext(System.out::println)
    .subscribe();

이 예제에서는 Camel에서 생성한 모든 숫자를 System.out 로 출력합니다.

272.5.1. 직접 API를 사용하여 Camel에서 데이터 가져오기

Camel 경로 및 Camel DSL을 전혀 사용하지 않고 동적 프레임워크의 기능 구성을 사용하여 전체 처리 흐름을 정의하는 것을 선호하는 사용자의 경우 Camel URI를 사용하여 스트림을 정의할 수도 있습니다.

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

// Get a stream from all the files in a directory
Publisher<String> files = camel.from("file:folder", String.class);

// Use the stream in RxJava2
Flowable.fromPublisher(files)
    .doOnNext(System.out::println)
    .subscribe();