307.3. URI 形式

現在、Spark コンポーネントはプロデューサーのみをサポートしています。これは、Spark ジョブを呼び出して結果を返すことを目的としています。RDD、データフレーム、または Hive SQL ジョブを呼び出すことができます。

Spark URI 形式

spark:{rdd|dataframe|hive}

307.3.1. Spark オプション

Apache Spark コンポーネントは、以下に示す 3 個のオプションをサポートしています。

名前説明デフォルトタイプ

rdd (producer)

計算対象の RDD。

 

JavaRDDLike

rddCallback (producer)

RDD に対してアクションを実行する関数。

 

RddCallback

resolveProperty Placeholders (advanced)

起動時にコンポーネントがプロパティープレースホルダーを解決するかどうか。String タイプのプロパティーのみがプロパティープレースホルダーを使用できます。

true

boolean

Apache Spark エンドポイントは、URI 構文を使用して設定されます。

spark:endpointType

パスおよびクエリーパラメーターを使用します。

307.3.2. パスパラメーター (1 個のパラメーター):

名前説明デフォルトタイプ

endpointType

必須 エンドポイントのタイプ (rdd、データフレーム、ハイブ)。

 

EndpointType

307.3.3. クエリーパラメーター (6 個のパラメーター):

名前説明デフォルトタイプ

collect (producer)

結果を収集またはカウントする必要があるかどうかを示します。

true

boolean

dataFrame (producer)

計算対象の DataFrame。

 

DataFrame

dataFrameCallback (producer)

DataFrame に対してアクションを実行する関数。

 

DataFrameCallback

rdd (producer)

計算対象の RDD。

 

JavaRDDLike

rddCallback (producer)

RDD に対してアクションを実行する関数。

 

RddCallback

synchronous (advanced)

同期処理を厳密に使用するか、Camel が非同期処理を使用できるかどうかを設定します (サポートされている場合)。

false

boolean

  # RDD ジョブ 

RDD ジョブを呼び出すには、次の URI を使用します。

Spark RDD producer

spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

 rdd オプションは Camel レジストリーの RDD インスタンス (org.apache.spark.api.java.JavaRDDLike のサブクラス) の名前を参照し、rddCallbackorg.apache.camel.component.spark.RddCallback インターフェイスの実装を参照します。(レジストリーからも)。RDD コールバックは、特定の RDD に対して入力メッセージを適用するために使用される単一のメソッドを提供します。コールバック計算の結果は、ボディーとしてエクスチェンジに保存されます。

Spark RDD コールバック

public interface RddCallback<T> {
    T onRdd(JavaRDDLike rdd, Object... payloads);
}

次のスニペットは、ジョブへの入力としてメッセージを送信し、結果を返す方法を示しています。

Spark ジョブの呼び出し

String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);

Spring Bean として登録された上記のスニペットの RDD コールバックは、次のようになります。

Spark RDD コールバック

@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(JavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

Spring の RDD 定義は次のようになります。

Spark RDD 定義

@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

307.3.4. RDD コールバックを無効にする

RDD コールバックが Camel パイプラインに値を返さない場合は、null 値を返すか、VoidRddCallback 基本クラスを使用できます。

Spark RDD 定義

@Bean
RddCallback<Void> rddCallback() {
  return new VoidRddCallback() {
        @Override
        public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
            rdd.saveAsTextFile(output.getAbsolutePath());
        }
    };
}

307.3.5. RDD コールバックの変換

RDD コールバックに送信される入力データのタイプがわかっている場合は、ConvertingRddCallback を使用して、受信メッセージをコールバックに挿入する前に Camel に自動的に変換させることができます。

Spark RDD 定義

@Bean
RddCallback<Long> rddCallback(CamelContext context) {
  return new ConvertingRddCallback<Long>(context, int.class, int.class) {
            @Override
            public Long doOnRdd(JavaRDDLike rdd, Object... payloads) {
                return rdd.count() * (int) payloads[0] * (int) payloads[1];
            }
        };
    };
}

307.3.6. アノテーション付き RDD コールバック

おそらく、RDD コールバックを操作する最も簡単な方法は、@RddCallback アノテーションでマークされたメソッドをクラスに提供することです。

アノテーション付き RDD コールバック定義

import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback() {
    return annotatedRddCallback(new MyTransformation());
}
 
...
 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyTransformation {
 
    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }
 
}

CamelContext をアノテーション付き RDD コールバックファクトリーメソッドに渡す場合、作成されたコールバックは、入力ペイロードを変換して、アノテーション付きメソッドのパラメーターに一致させることができます。

アノテーション付き RDD コールバックのボディー変換

import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
    return annotatedRddCallback(new MyTransformation(), camelContext);
}
 
...

 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyTransformation {
 
    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }
 
}
 
...
 
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);