313.4.3. 注解的 RDD 回调
使用 RDD 回调的最简单方式是为类提供标有 @RddCallback 注解的方法:
注解的 RDD 回调定义
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback() {
return annotatedRddCallback(new MyTransformation());
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}如果将 CamelContext 传递给注解的 RDD 回调工厂方法,则创建的回调将可以转换传入的有效负载以匹配注解方法的参数:
注解的 RDD 回调的正文转换
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
return annotatedRddCallback(new MyTransformation(), camelContext);
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
...
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);