第34章 Apache Spark との統合

34.1. JBoss Data Grid の Apache Spark コネクター

JBoss Data Grid には Spark コネクターが含まれ、Apache Spark との密な統合を提供します。また、Java または Scala で書かれたアプリケーションが JBoss Data Grid をバッキングデータストアとして使用できるようにします。

実際には、Apache Spark 1.6.x をサポートするコネクターと、Apache Spark 2.x をサポートするコネクターの 2 つがあり、それぞれ Scala 2.10.x と 2.11.x をサポートします。両方のコネクターは、メインディストリビューションとは別に提供されます。

Apache Spark 1.6 コネクターには以下のサポートが含まれています。

  • キャッシュからの RDD の作成
  • キーバリュー RDD のキャッシュへの書き込み
  • キャッシュレベルイベントからの DStream の作成
  • キーバリュー DStream のキャッシュへの書き込み

上記の機能の他に、Apache Spark 2 コネクターは以下の機能もサポートします。

  • JDG サーバー側フィルターを使用したキャッシュベースの RDD の作成
  • JBoss Marshalling を基にした Spark シリアライザー
  • 述語のプッシュダウンをサポートする Dataset API
注記

Apache Spark のサポートは、リモートクライアントサーバーモードでのみ有効です。

34.2. Spark の依存関係

Apache Spark のバージョンに応じて、以下の Maven 設定を使用する必要があります。

Spark 1.6.x の pom.xml

<dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-spark</artifactId>
    <version>0.3.0.Final-redhat-2</version>
</dependency>

Spark 2.x の pom.xml

<dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-spark</artifactId>
    <version>0.6.0-redhat-9</version>
</dependency>

34.3. Spark コネクターの設定

Apache Spark の バージョン 1.6 コネクターとバージョン 2 コネクターは、設定に同じインターフェースを使用しません。バージョン 1.6 コネクターはプロパティーを使用し、バージョン 2 コネクターはメソッドを使用します。

34.3.1. バージョン 1.6 コネクターを設定するプロパティー

表34.1 バージョン 1.6 コネクターを設定するプロパティー

プロパティー名説明デフォルト値

infinispan.client.hotrod.server_list

JBoss Data Grid ノードのリスト

localhost:11222

infinispan.rdd.cacheName

RDD をバックするキャッシュの名前

デフォルトキャッシュ

infinispan.rdd.read_batch_size

キャッシュから読み取りするときのバッチサイズ (エントリー数)

10000

infinispan.rdd.write_batch_size

キャッシュへ書き込みするときのバッチサイズ (エントリー数)

500

infinispan.rdd.number_server_partitions

JBoss Data Grid サーバーごとに作成されるパーティションの数

2

infinispan.rdd.query.proto.protofiles

protobuf ファイル名およびコンテンツを持つマップ

エントリーに protobuf エンコーディング情報のアノテーションが付けられている場合は省略できます。protobuf エンコーディングは、クエリーによる RDD のフィルターに必要です。

infinispan.rdd.query.proto.marshallers

キャッシュにあるオブジェクトの protostream マーシャラークラスのリスト

エントリーに protobuf エンコーディング情報のアノテーションが付けられている場合は省略できます。protobuf エンコーディングは、クエリーによる RDD のフィルターに必要です。

34.3.2. バージョン 2 コネクターを設定するメソッド

以下のメソッドは、バージョン 2 コネクターの設定に使用できます。これらのメソッドは org.infinispan.spark.config.ConnectorConfiguration クラスによって提供されます。

表34.2 バージョン 2 コネクターを設定するメソッド

メソッド名説明デフォルト値

setServerList(String)

JBoss Data Grid ノードのリスト

localhost:11222

setCacheName(String)

RDD をバックするキャッシュの名前

デフォルトキャッシュ

setReadBatchSize(Integer)

キャッシュから読み取りするときのバッチサイズ (エントリー数)

10000

setWriteBatchSize(Integer)

キャッシュへ書き込みするときのバッチサイズ (エントリー数)

500

setPartitions(Integer)

JBoss Data Grid サーバーごとに作成されるパーティションの数

2

addProtoFile(String name, String contents)

protobuf ファイル名およびコンテンツを持つマップ

エントリーに protobuf エンコーディング情報のアノテーションが付けられている場合は省略できます。protobuf エンコーディングは、クエリーによる RDD のフィルターに必要です。

addMessageMarshaller(Class)

キャッシュにあるオブジェクトの protostream マーシャラークラスのリスト

エントリーに protobuf エンコーディング情報のアノテーションが付けられている場合は省略できます。protobuf エンコーディングは、クエリーによる RDD のフィルターに必要です。

addProtoAnnotatedClass(Class)

protobuf アノテーションが含まれるクラスの登録

addProtoFile および addMessageMarshaller メソッドはアノテーションを基に自動生成されるため、これらの代わりに使用します。

setAutoRegisterProto()

protobuf スキーマをサーバーで自動登録

なし

addHotRodClientProperty(key, value)

JBoss Data Grid サーバーとの通信時に追加の Hot Rod クライアントプロパティーを設定

なし

setTargetEntity(Class)

クエリーターゲットを指定するために Dataset API とともに使用

省略した場合、protobuf のアノテーションが付けられた 1 つのクラスのみが設定されていれば、そのクラスが選択されます。

34.3.3. セキュアな JDG クラスターへの接続

JDG クラスターがセキュアな場合、Hot Rod にセキュリティーを設定しないと、Spark コネクターは動作しません。Hot Rod セキュリティーを設定するためにセットアップできる Hot Rod プロパティーは複数あります。これらのプロパティーの説明は、『Administration and Configuration Guide』の付録にある「Hot Rod Properties table」の infinispan.client.hotrod.use_ssl 以降に記載されています。

重要

これらのプロパティーは、バージョン 2 Apache Spark コネクターのみに限定されます。

34.4. Spark 1.6 のコード例

34.4.1. Spark 1.6 のコード例

Apache Spark 1.6 のコネクターは、バージョン 2 コネクターとは異なる設定メカニズムを使用し、バージョン 2 コネクターは 1.6 がサポートしない機能を一部サポートするため、各バージョンのコード例は独自のセクションに分割されています。以下のコード例は、バージョン 1.6 のSpark コネクターと動作します。Spark 2 のコード例はこちらを参照してください。

34.4.2. RDD の作成および使用

Apache Spark 1.6 コネクターを使用する場合、RDD は「バージョン 1.6 コネクターを設定するプロパティー」に記載されている設定に Properties インスタンスを指定して作成されます。その後、Spark コンテキストとともに使用して、通常の Spark 操作と使用される InfinispanRDD を作成します。

以下に Java および Scala での例を示します。

34.4.3. RDD の作成

RDD の作成 (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import java.util.Properties;
[...]
// Begin by defining a new Spark configuration and creating a Spark context from this.
SparkConf conf = new SparkConf().setAppName("example-RDD");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Create the Properties instance, containing the JBoss Data Grid node and cache name.
Properties properties = new Properties();
properties.put("infinispan.client.hotrod.server_list", "server:11222");
properties.put("infinispan.rdd.cacheName","exampleCache");

// Create the RDD
JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties);

JavaRDD<Book> booksRDD = exampleRDD.values();

RDD の作成 (Scala)

import java.util.Properties

import org.apache.spark.{SparkConf, SparkContext}
import org.infinispan.spark.rdd.InfinispanRDD
import org.infinispan.spark._

// Begin by defining a new Spark configuration and creating a Spark context from this.
val conf = new SparkConf().setAppName("example-RDD-scala")
val sc = new SparkContext(conf)

// Create the Properties instance, containing the JBoss Data Grid node and cache name.
val properties = new Properties
properties.put("infinispan.client.hotrod.server_list", "server:11222")
properties.put("infinispan.rdd.cacheName", "exampleCache")

// Create an RDD from the DataGrid cache
val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties)

val booksRDD = exampleRDD.values

34.4.4. RDD のクエリー

RDD が利用可能になると、Spark RDD 操作または Spark の SQL サポートのいずれかを使用してバッキングキャッシュを取得できます。上記の例を拡張して author (著者) ごとにエントリーをカウントすると、結果となる RDD と SQL クエリーは次のようになります。

RDD のクエリー (Java)

// The following imports should be added to the list from the previous example
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
[...]
// Continuing the previous example

// Create a SQLContext, registering the data frame and table
SQLContext sqlContext = new SQLContext(jsc);
DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class);
dataFrame.registerTempTable("books");

// Run the Query and collect results
List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();

RDD のクエリー (Scala)

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
[...]
// Create a SQLContext, register a data frame and table
val sqlContext = new SQLContext(sc)
val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book])
dataFrame.registerTempTable("books")

 // Run the Query and collect the results
val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()

34.4.5. RDD のキャッシュへの書き込み

静的 InfinispanJavaRDD.write() メソッドを使用すると、あらゆるキーバリューベースの RDD を Data Grid キャッシュに書き込みできます。これにより、RDD の内容がキャッシュにコピーされます。

RDD の書き込み (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.spark.domain.Address;
import org.infinispan.spark.domain.Person;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import scala.Tuple2;
import java.util.List;
import java.util.Properties;
[...]
// Define the location of the JBoss Data Grid node
Properties properties = new Properties();
properties.put("infinispan.client.hotrod.server_list", "localhost:11222");
properties.put("infinispan.rdd.cacheName","exampleCache");

// Create the JavaSparkContext
SparkConf conf = new SparkConf().setAppName("write-example-RDD");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Defining two entries to be stored in a RDD
// Each Book will contain the title, author, and publicationYear
Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015");
Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014");

List<Tuple2<Integer, Book>> pairs = Arrays.asList(
    new Tuple2<>(1, bookOne),
    new Tuple2<>(2, bookTwo)
);

// Create the RDD using the newly created List
JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs);

// Write the entries into the cache
InfinispanJavaRDD.write(pairsRDD, config);

RDD の書き込み (Scala)

import java.util.Properties
import org.infinispan.spark._
import org.infinispan.spark.rdd.InfinispanRDD
[...]
// Define the location of the JBoss Data Grid node
val properties = new Properties
properties.put("infinispan.client.hotrod.server_list", "localhost:11222")
properties.put("infinispan.rdd.cacheName", "exampleCache")

// Create the SparkContext
val conf = new SparkConf().setAppName("write-example-RDD-scala")
val sc = new SparkContext(conf)

// Create an RDD of Books
val bookOne = new Book("Linux Bible", "Negus, Chris", "2015")
val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014")

val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo))
val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap)

// Write the Key/Value RDD to the Data Grid
pairsRDD.writeToInfinispan(properties)

34.4.5.1. DStreams の作成および使用

DStream は継続するデータのストリームを表し、継続的な一連の RDD によって内部で表されます。各 RDD には特定の時間間隔からのデータが含まれます。

DStream の作成には、以下の例のように StreamingContextStorageLevel および JBoss Data Grid RDD 設定とともに渡されます。

DStream の作成 (Scala)

import org.infinispan.spark.stream._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import java.util.Properties

// Spark context
val sc = ...
// java.util.Properties with Infinispan RDD configuration
val props = ...
val ssc = new StreamingContext(sc, Seconds(1))

val stream = new InfinispanInputDStream[String, Book](ssc, StorageLevel.MEMORY_ONLY, props)

InfinispanInputDStream は多くの Spark の DStream 操作を使用して変換でき、StreamingContext で「start」を呼び出した後に処理が発生します。たとえば、最後の 30 秒でキャッシュに挿入された本の数を 10 秒ごとに表示する場合は以下のようになります。

DStream の処理 (Scala)

import org.infinispan.spark.stream._

val stream = ... // From previous sample

// Filter only created entries
val createdBooksRDD = stream.filter { case (_, _, t) => t == Type.CLIENT_CACHE_ENTRY_CREATED }

// Reduce last 30 seconds of data, every 10 seconds
val windowedRDD: DStream[Long] = createdBooksRDD.count().reduceByWindow(_ + _, Seconds(30), Seconds(10))

// Prints the results, couting the number of occurences in each individual RDD
windowedRDD.foreachRDD { rdd => println(rdd.reduce(_ + _)) }

// Start the processing
ssc.start()
ssc.awaitTermination()

DStreams での JBoss Data Grid への書き込み

キーバリュー型のすべての DStream は、Java の場合は InfinispanJavaDStream.writeToInfinispan() Java メソッド経由で JBoss Data Grid へ書き込みでき、Scala の場合は暗黙的な writeToInfinispan(properties) メソッドを直接 DStream インスタンスで使用して書き込みできます。両方のメソッドは JBoss Data Grid RDD 設定を入力として取り、DStream 内に含まれる各 RDD を書き込みます。

34.4.6. Spark での Infinispan Query DSL の使用

Infinispan Query DSL は InfinispanRDD のフィルターとして使用でき、RDD レベルではなくソースでデータを事前にフィルターできます。

重要

クエリーする DSL が適切に動作するには、キャッシュのデータを protobuf でエンコードする必要があります。protobuf エンコーティングの使用手順は「Protobuf エンコーティング」を参照してください。

著者の名前に Doe 含まれる本のリストを取得する以下の例を見てください。

34.4.7. クエリーによるフィルリング

クエリーによるフィルリング (Scala)

import org.infinispan.client.hotrod.impl.query.RemoteQuery
import org.infinispan.client.hotrod.{RemoteCacheManager, Search}
import org.infinispan.spark.domain._
[...]
val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName))
    .from(classOf[Book])
    .having("author").like("Doe")
    .toBuilder[RemoteQuery].build()

val rdd = createInfinispanRDD[Int, Book]
    .filterByQuery[Book]](query, classOf[Book])

射影も完全サポートされます。たとえば、上記の例を調整して著書名と出版年のみを取得し、出版年のフィールドでソートすることができます。

34.4.8. 射影を使用したフィルタリング

射影を使用したフィルタリング (Scala)

import org.infinispan.client.hotrod.impl.query.RemoteQuery
import org.infinispan.client.hotrod.{RemoteCacheManager, Search}
import org.infinispan.spark.domain._
[...]
val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName))
    .select("title","publicationYear")
    .from(classOf[Book])
    .having("author").like("Doe")
    .groupBy("publicationYear")
    .toBuilder[RemoteQuery].build()

val rdd = createInfinispanRDD[Int, Book]
    .filterByQuery[Array[AnyRef]](query, classOf[Book])

さらに、フィルターがすでに JBoss Data Grid サーバーにデプロイされている場合は、以下のように名前で参照することもできます。

34.4.9. デプロイされたフィルターを使用したフィルタリング

デプロイされたフィルターを使用したフィルタリング (Scala)

val rdd = InfinispanRDD[String,Book] = ....
// "my-filter-factory" filter and converts Book to a String, and has two parameters
val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")

34.5. Spark 2 のコード例

34.5.1. Spark 2 のコード例

Apache Spark 2 のコネクターは 1.6 コネクターとは異なる設定メカニズムを使用し、バージョン 2 コネクターは 1.6 がサポートしない機能を一部サポートするため、各バージョンのコード例は独自のセクションに分割されています。以下のコード例は、バージョン 2 のSpark コネクターと動作します。Spark 1.6 のコード例はこちらを参照してください。

34.5.2. RDD の作成および使用

Apache Spark 2 では、RDD (Resilient Distributed Datasets) は「バージョン 2 コネクターを設定するメソッド」に記載されている設定に ConnectorConfiguration インスタンスを指定して作成されます。

以下に Java および Scala での例を示します。

34.5.3. RDD の作成

RDD の作成 (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration config = new ConnectorConfiguration()
            .setCacheName("exampleCache").setServerList("server:11222");

JavaPairRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, config);

JavaRDD<MyEntity> entitiesRDD = infinispanRDD.values();

RDD の作成 (Scala)

import org.apache.spark.SparkContext
import org.infinispan.spark.config.ConnectorConfiguration
import org.infinispan.spark.rdd.InfinispanRDD

val sc: SparkContext = new SparkContext()

val config = new ConnectorConfiguration().setCacheName("my-cache").setServerList("10.9.0.8:11222")

val infinispanRDD = new InfinispanRDD[String, MyEntity](sc, config)

val entitiesRDD = infinispanRDD.values

34.5.4. RDD のクエリー

34.5.4.1. SparkSQL クエリー

SparkSQL クエリーの使用 (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration conf = new ConnectorConfiguration();

// Obtain the values from an InfinispanRDD
JavaPairRDD<Long, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

JavaRDD<MyEntity> valuesRDD = infinispanRDD.values();

// Create a DataFrame from a SparkSession
SparkSession sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate();
Dataset<Row> dataFrame = sparkSession.createDataFrame(valuesRDD, MyEntity.class);

// Create a view
dataFrame.createOrReplaceTempView("myEntities");

// Create and run the Query
Dataset<Row> rows = sparkSession.sql("SELECT field1, count(*) as c from myEntities WHERE field1 != 'N/A' GROUP BY field1 ORDER BY c desc");

SparkSQL クエリーの使用 (Scala)

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.infinispan.spark.config.ConnectorConfiguration
import org.infinispan.spark.rdd._

val sc: SparkContext = // Initialize your SparkContext here

val config = new ConnectorConfiguration().setServerList("myserver1:port,myserver2:port")

// Obtain the values from an InfinispanRDD
val infinispanRDD = new InfinispanRDD[Long, MyEntity](sc, config)
val valuesRDD = infinispanRDD.values

// Create a DataFrame from a SparkSession
val sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate()
val dataFrame = sparkSession.createDataFrame(valuesRDD, classOf[MyEntity])

// Create a view
dataFrame.createOrReplaceTempView("myEntities")

// Create and run the Query, collect and print results
sparkSession.sql("SELECT field1, count(*) as c from myEntities WHERE field1 != 'N/A' GROUP BY field1 ORDER BY c desc")
                  .collect().take(20).foreach(println)

34.5.5. RDD のキャッシュへの書き込み

静的 InfinispanJavaRDD.write() メソッドを使用すると、あらゆるキーバリューベースの RDD を Data Grid キャッシュに書き込みできます。これにより、RDD の内容がキャッシュにコピーされます。

RDD の書き込み (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration();

List<Integer> numbers = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
JavaPairRDD<Integer, Long> numbersRDD = jsc.parallelize(numbers).zipWithIndex();

InfinispanJavaRDD.write(numbersRDD, connectorConfiguration);

RDD の書き込み (Scala)

import org.apache.spark.SparkContext
import org.infinispan.spark._
import org.infinispan.spark.config.ConnectorConfiguration

val config: ConnectorConfiguration = // Initialize your ConnectorConfiguration here
val sc: SparkContext = // Initialize your SparkContext here

val simpleRdd = sc.parallelize(1 to 1000).zipWithIndex()
simpleRdd.writeToInfinispan(config)

34.5.6. DStreams の作成

DStream は継続するデータのストリームを表し、継続的な一連の RDD によって内部で表されます。各 RDD には特定の時間間隔からのデータが含まれます。

DStream の作成には、以下の例のように StreamingContextStorageLevel および JBoss Data Grid RDD 設定とともに渡されます。

DStream の作成 (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.stream.InfinispanJavaDStream;
import static org.apache.spark.storage.StorageLevel.MEMORY_ONLY;

SparkConf conf = new SparkConf().setAppName("my-stream-app");

ConnectorConfiguration configuration = new ConnectorConfiguration();

JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(1));

InfinispanJavaDStream.createInfinispanInputDStream(jsc, MEMORY_ONLY(), configuration);

DStream の作成 (Scala)

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.infinispan.spark.config.ConnectorConfiguration
import org.infinispan.spark.stream._

val sc = new SparkContext()
val config = new ConnectorConfiguration()
val ssc = new StreamingContext(sc, Seconds(1))
val stream = new InfinispanInputDStream[String, MyEntity](ssc, StorageLevel.MEMORY_ONLY, config)

DStreams での JBoss Data Grid への書き込み

キーバリュー型のすべての DStream は、Java の場合は InfinispanJavaDStream.writeToInfinispan() Java メソッド経由で JBoss Data Grid へ書き込みでき、Scala の場合は暗黙的な writeToInfinispan(properties) メソッドを直接 DStream インスタンスで使用して書き込みできます。両方のメソッドは JBoss Data Grid RDD 設定を入力として取り、DStream 内に含まれる各 RDD を書き込みます。

34.5.7. Apache Spark Dataset API の使用

JBoss Data Grid は、RDD (Resilient Distributed Dataset) プログラミングインターフェースの他に、Apache Spark Dataset API が含まれています。この API には、rdd.filterByQuery と似た述語のプッシュダウンのサポートが含まれます。

Dataset API の例 (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.infinispan.spark.config.ConnectorConfiguration;

import java.util.List;

// Configure the connector using the ConnectorConfiguration: register entities annotated with Protobuf,
// and turn on automatic registration of schemas
ConnectorConfiguration connectorConfig = new ConnectorConfiguration()
         .setServerList("server1:11222,server2:11222")
         .addProtoAnnotatedClass(User.class)
         .setAutoRegisterProto();

// Create the SparkSession
SparkSession sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate();

// Load the "infinispan" datasource into a DataFame, using the infinispan config
Dataset<Row> df = sparkSession.read().format("infinispan").options(connectorConfig.toStringsMap()).load();

// From here it's possible to query using the DatasetSample API...
List<Row> rows = df.filter(df.col("age").gt(30)).filter(df.col("age").lt(40)).collectAsList();

// ... or execute SQL queries
df.createOrReplaceTempView("user");
String query = "SELECT first(r.name) as name, first(r.age) as age FROM user u GROUP BY r.age";
List<Row> results = sparkSession.sql(query).collectAsList();

Dataset API の例 (Scala)

import org.apache.spark._
import org.apache.spark.sql._
import org.infinispan.protostream.annotations.{ProtoField, ProtoMessage}
import org.infinispan.spark.config.ConnectorConfiguration

import scala.annotation.meta.beanGetter
import scala.beans.BeanProperty


  // Entities can be annotated in order to automatically generate protobuf schemas.
  // Also, they should be valid java beans. From Scala this can be achieved as:

@ProtoMessage(name = "user")
class User(@(ProtoField@beanGetter)(number = 1, required = true) @BeanProperty var name: String,
           @(ProtoField@beanGetter)(number = 2, required = true) @BeanProperty var age: Int) {
   def this() = {
        this(name = "", age = -1)
   }
}

// Configure the connector using the ConnectorConfiguration: register entities annotated with Protobuf,
// and turn on automatic registration of schemas
val infinispanConfig: ConnectorConfiguration = new ConnectorConfiguration()
      .setServerList("server1:11222,server2:11222")
      .addProtoAnnotatedClass(classOf[User])
      .setAutoRegisterProto()

// Create the SparkSession
val sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate()

// Load the "infinispan" datasource into a DataFame, using the infinispan config
val df = sparkSession.read.format("infinispan").options(infinispanConfig.toStringsMap).load()

// From here it's possible to query using the DatasetSample API...
val rows: Array[Row] = df.filter(df("age").gt(30)).filter(df("age").lt(40)).collect()

// ... or execute SQL queries
df.createOrReplaceTempView("user")
val query = "SELECT first(r.name) as name, first(r.age) as age FROM user u GROUP BY r.age"
val rowsFromSQL: Array[Row] = sparkSession.sql(query).collect()

34.5.8. Spark での Infinispan Query DSL の使用

Infinispan Query DSL は InfinispanRDD のフィルターとして使用でき、RDD レベルではなくソースでデータを事前にフィルターできます。

重要

クエリーする DSL が適切に動作するには、キャッシュのデータを protobuf でエンコードする必要があります。protobuf エンコーティングの使用手順は「Protobuf エンコーティング」を参照してください。

34.5.9. 事前ビルドされたクエリーオブジェクトを使用したフィルタリング

事前ビルドされたクエリーオブジェクトを使用したフィルタリング (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.Search;
import org.infinispan.query.dsl.Query;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration conf = new ConnectorConfiguration();

InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

RemoteCache<String, MyEntity> remoteCache = new RemoteCacheManager().getCache();

// Assuming MyEntity is already stored in the cache with protobuf encoding, and has protobuf annotations.
Query query = Search.getQueryFactory(remoteCache).from(MyEntity.class).having("field").equal("value").build();

JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByQuery(query);

事前ビルドされたクエリーオブジェクトを使用したフィルタリング (Scala)

import org.infinispan.client.hotrod.{RemoteCache, Search}
import org.infinispan.spark.rdd.InfinispanRDD

val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here
val cache: RemoteCache[String, MyEntity] = // Initalize your RemoteCache here

// Assuming MyEntity is already stored in the cache with protobuf encoding, and has protobuf annotations.
val query = Search.getQueryFactory(cache).from(classOf[MyEntity]).having("field").equal("value").build()

val filteredRDD = rdd.filterByQuery(query)

34.5.10. Ickle クエリーを使用したフィルタリング

Ickle クエリーを使用したフィルタリング (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();
ConnectorConfiguration conf = new ConnectorConfiguration();

InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByQuery("From myEntity where field = 'value'");

Ickle クエリーを使用したフィルタリング (Scala)

import org.infinispan.spark.rdd.InfinispanRDD

val rdd: InfinispanRDD[String, MyEntity] = // Initialize your InfinispanRDD here
val filteredRDD = rdd.filterByQuery("FROM MyEntity e where e.field BETWEEN 10 AND 20")

34.5.11. サーバー上でのフィルタリング

サーバー上でのフィルタリング (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration conf = new ConnectorConfiguration();

InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByCustom("my-filter", "param1", "param2");

サーバー上でのフィルタリング (Scala)

import org.infinispan.spark.rdd.InfinispanRDD

val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here
// "my-filter-factory" filter and converts MyEntity to a Double, and has two parameters
val filteredRDD = rdd.filterByCustom[Double]("my-filter-factory", "param1", "param2")

34.6. Spark のパフォーマンスに関する注意点

Data Grid の Spark コネクターは、デフォルトでは Data Grid ノードごとに 2 つのパーティションを作成し、各パーティションはそのノードのデータのサブセットを指定します。

これらのパーティションは Spark ワーカーへ送信され、並列処理されます。Spark ワーカーの数が Data Grid ノードよりも少ない場合、各ワーカーにはタスクを並列実行できる最大限度があるため、遅延が発生することがあります。そのため、Spark ワーカーの数を最低でも Data Grid ノードの数と同じにしてこの並行処理を活用することが推奨されます。

また、Spark ワーカーが Data Grid ノードと同じノードに位置する場合、各ワーカーがローカルの Data Grid ノードにあるデータのみを処理するようコネクターによってタスクが分散されます。