313.4. Spring Boot Auto-Configuration
组件支持 4 个选项,如下所示。
| 名称 | 描述 | 默认 | 类型 |
|---|---|---|---|
| camel.component.spark.enabled | 启用 spark 组件 | true | 布尔值 |
| camel.component.spark.rdd | 用于计算的 RDD,选项是一个 org.apache.spark.api.java.JavaRDDLike 类型。 | 字符串 | |
| camel.component.spark.rdd-callback | 针对 RDD 执行操作的功能.选项是一个 org.apache.camel.component.spark.RddCallback 类型。 | 字符串 | |
| camel.component.spark.resolve-property-placeholders | 启动时,组件是否应自行解析属性占位符。只有 String 类型的属性才能使用属性占位符。 | true | 布尔值 |
# RDD job
要调用 RDD 作业,请使用以下 URI:
spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
其中 rdd 选项是指 RDD 实例的名称(subclass of org.apache.spark.api.java.JavaRDDLike),而 rddCallback 指的是 org.apache.camel.component.spark.RddCallback 接口(也来自 registry)的实现。RDD 回调提供单一方法,用于针对给定的 RDD 应用传入的消息。回调计算结果保存为交换的正文。
spark RDD 回调
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}以下片段演示了如何将消息作为输入发送到作业并返回结果:
调用 spark 任务
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);上面注册为 Spring bean 的 RDD 回调可能如下所示:
spark RDD 回调
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(JavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
}Spring 中的 RDD 定义可能如下所示:
spark RDD 定义
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}313.4.1. void RDD 回调
如果您的 RDD 回调不会将任何值返回至 Camel 管道,您可以返回 null 值或使用 VoidRddCallback 基础类:
spark RDD 定义
@Bean
RddCallback<Void> rddCallback() {
return new VoidRddCallback() {
@Override
public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
rdd.saveAsTextFile(output.getAbsolutePath());
}
};
}