313.4. Spring Boot Auto-Configuration

组件支持 4 个选项,它们如下所列。

Name描述默认类型

camel.component.spark.enabled

启用 spark 组件

true

布尔值

camel.component.spark.rdd

用于计算的 RDD,选项是一个 org.apache.spark.api.java.JavaRDDLike 类型。

 

字符串

camel.component.spark.rdd-callback

针对 RDD 执行操作的功能.选项是一个 org.apache.camel.component.spark.RddCallback 类型。

 

字符串

camel.component.spark.resolve-property-placeholders

启动时,组件是否应自行解析属性占位符。只有 String 类型的属性才能使用属性占位符。

true

布尔值

  # RDD job 

要调用 RDD 作业,请使用以下 URI:

spark RDD producer

spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

 其中 rdd 选项是指 RDD 实例的名称(subclass of org.apache.spark.api.java.JavaRDDLike),而 rddCallback 指的是 org.apache.camel.component.spark.RddCallback 接口(也来自 registry)的实现。RDD 回调提供单一方法,用于针对给定的 RDD 应用传入的消息。回调计算结果保存为交换的正文。

spark RDD 回调

public interface RddCallback<T> {
    T onRdd(JavaRDDLike rdd, Object... payloads);
}

以下片段演示了如何将消息作为输入发送到作业并返回结果:

调用 spark 任务

String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);

上面注册为 Spring bean 的 RDD 回调可能如下所示:

spark RDD 回调

@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(JavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

Spring 中的 RDD 定义可能如下所示:

spark RDD 定义

@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

313.4.1. void RDD 回调

如果您的 RDD 回调不会将任何值返回至 Camel 管道,您可以返回 null 值或使用 VoidRddCallback 基础类:

spark RDD 定义

@Bean
RddCallback<Void> rddCallback() {
  return new VoidRddCallback() {
        @Override
        public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
            rdd.saveAsTextFile(output.getAbsolutePath());
        }
    };
}