272.6. 将数据发送到 Camel

当外部库需要将事件推送到 Camel 路由时,Reactive Streams 端点必须设置为使用者。

from("reactive-streams:elements")
.to("log:INFO");

元素 流的句柄可以从 CamelReactiveStreams 实用程序类获取。

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

Subscriber<String> elements = camel.streamSubscriber("elements", String.class);

订阅者可用于将事件推送到从 元素 流中消耗的 Camel 路由。

以下是如何将它与 RxJava 2 搭配使用的示例(尽管任何被动框架可用于发布事件)。

Flowable.interval(1, TimeUnit.SECONDS)
    .map(i -> "Item " + i)
    .subscribe(elements);

字符串项由示例中 RxJava 生成,它们被推送到以上定义的 Camel 路由中。

272.6.1. 使用直接 API 将数据发送到 Camel

在这种情况下,可以使用直接 API 从端点 URI 获取 Camel 订阅者。

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

// Send two strings to the "seda:queue" endpoint
Flowable.just("hello", "world")
    .subscribe(camel.subscriber("seda:queue", String.class));