272.7. 请求向 Camel 转型

某些 Camel DSL 中定义的路由可以在被动流框架中使用,以执行特定的转换(相同机制也可用于将数据发送到 http 端点并继续)。

下列代码片段演示了 RxJava 功能代码如何能够请求加载和汇总文件到 Camel 的任务。

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

// Process files starting from their names
Flowable.just(new File("file1.txt"), new File("file2.txt"))
    .flatMap(file -> camel.toStream("readAndMarshal", String.class))
    // Camel output will be converted to String
    // other steps
    .subscribe();

要达到此目的,在 Camel 上下文中应定义类似以下的路由:

from("reactive-streams:readAndMarshal")
.marshal() // ... other details

272.7.1. 使用直接 API 请求向 Camel 转型

另一种方法是直接在被动流中使用 URI 端点:

CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);

// Process files starting from their names
Flowable.just(new File("file1.txt"), new File("file2.txt"))
    .flatMap(file -> camel.to("direct:process", String.class))
    // Camel output will be converted to String
    // other steps
    .subscribe();

使用 to () 方法而不是 toStream 时,不需要使用 "reactive-streams:" 端点(尽管在功能下使用它们)定义路由。

在这种情况下,Camel 转型只是:

from("direct:process")
.marshal() // ... other details