313.4. Spring Boot Auto-Configuration
组件支持 4 个选项,它们如下所列。
Name | 描述 | 默认 | 类型 |
---|---|---|---|
camel.component.spark.enabled | 启用 spark 组件 | true | 布尔值 |
camel.component.spark.rdd | 用于计算的 RDD,选项是一个 org.apache.spark.api.java.JavaRDDLike 类型。 | 字符串 | |
camel.component.spark.rdd-callback | 针对 RDD 执行操作的功能.选项是一个 org.apache.camel.component.spark.RddCallback 类型。 | 字符串 | |
camel.component.spark.resolve-property-placeholders | 启动时,组件是否应自行解析属性占位符。只有 String 类型的属性才能使用属性占位符。 | true | 布尔值 |
# RDD job
要调用 RDD 作业,请使用以下 URI:
spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
其中 rdd
选项是指 RDD 实例的名称(subclass of org.apache.spark.api.java.JavaRDDLike
),而 rddCallback
指的是 org.apache.camel.component.spark.RddCallback
接口(也来自 registry)的实现。RDD 回调提供单一方法,用于针对给定的 RDD 应用传入的消息。回调计算结果保存为交换的正文。
spark RDD 回调
public interface RddCallback<T> { T onRdd(JavaRDDLike rdd, Object... payloads); }
以下片段演示了如何将消息作为输入发送到作业并返回结果:
调用 spark 任务
String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
上面注册为 Spring bean 的 RDD 回调可能如下所示:
spark RDD 回调
@Bean RddCallback<Long> countLinesContaining() { return new RddCallback<Long>() { Long onRdd(JavaRDDLike rdd, Object... payloads) { String pattern = (String) payloads[0]; return rdd.filter({line -> line.contains(pattern)}).count(); } } }
Spring 中的 RDD 定义可能如下所示:
spark RDD 定义
@Bean JavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); }
313.4.1. void RDD 回调
如果您的 RDD 回调不会将任何值返回至 Camel 管道,您可以返回 null
值或使用 VoidRddCallback
基础类:
spark RDD 定义
@Bean RddCallback<Void> rddCallback() { return new VoidRddCallback() { @Override public void doOnRdd(JavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }; }