272.6. 将数据发送到 Camel
当外部库需要将事件推送到 Camel 路由时,Reactive Streams 端点必须设置为使用者。
from("reactive-streams:elements") .to("log:INFO");
对 元素
流的句柄可以从 CamelReactiveStreams
实用程序类获取。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); Subscriber<String> elements = camel.streamSubscriber("elements", String.class);
订阅者可用于将事件推送到从 元素
流中消耗的 Camel 路由。
以下是如何将它与 RxJava 2 搭配使用的示例(尽管任何被动框架可用于发布事件)。
Flowable.interval(1, TimeUnit.SECONDS) .map(i -> "Item " + i) .subscribe(elements);
字符串项由示例中 RxJava 生成,它们被推送到以上定义的 Camel 路由中。
272.6.1. 使用直接 API 将数据发送到 Camel
在这种情况下,可以使用直接 API 从端点 URI 获取 Camel 订阅者。
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); // Send two strings to the "seda:queue" endpoint Flowable.just("hello", "world") .subscribe(camel.subscriber("seda:queue", String.class));