Chapter 34. Integration with Apache Spark

34.1. The JBoss Data Grid Apache Spark Connector

JBoss Data Grid includes a Spark connector, providing tight integration with Apache Spark, and allowing applications written either in Java or Scala to utilize JBoss Data Grid as a backing data store.

There actually are two connectors, one that supports Apache Spark 1.6.x, and one that supports Apache Spark 2.x, which in turn support Scala 2.10.x, and 2.11.x, respectively. Both of these connectors are shipped separately from the main distribution.

The Apache Spark 1.6 connector includes support for the following:

  • Create an RDD from any cache
  • Write a key/value RDD to a cache
  • Create a DStream from cache-level events
  • Write a key/value DStream to a cache

In addition to the above features, the Apache Spark 2 connector supports these features:

  • Use JDG server side filters to create a cache based RDD
  • Spark serializer based on JBoss Marshalling
  • Dataset API with push down predicates support
Note

Support for Apache Spark is only available in Remote Client-Server Mode.

34.2. Spark Dependencies

The following Maven configuration should be used depending on the desired version of Apache Spark:

pom.xml for Spark 1.6.x

<dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-spark</artifactId>
    <version>0.3.0.Final-redhat-2</version>
</dependency>

pom.xml for Spark 2.x

<dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-spark</artifactId>
    <version>0.6.0.Final-redhat-9</version>
</dependency>

34.3. Configuring the Spark Connector

The Apache Spark version 1.6 and version 2 connectors do not use the same interfaces for configuration. The version 1.6 connector uses properties, and the version 2 connector uses methods.

34.3.1. Properties to Configure the Version 1.6 Connector

Table 34.1. Properties to Configure the Version 1.6 Connector

Property NameDescriptionDefault Value

infinispan.client.hotrod.server_list

List of JBoss Data Grid nodes

localhost:11222

infinispan.rdd.cacheName

The name of the cache that will back the RDD

default cache

infinispan.rdd.read_batch_size

Batch size (number of entries) when reading from the cache

10000

infinispan.rdd.write_batch_size

Batch size (number of entries) when writing to the cache

500

infinispan.rdd.number_server_partitions

Numbers of partitions created per JBoss Data Grid server

2

infinispan.rdd.query.proto.protofiles

Map with protobuf file names and contents

Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query.

infinispan.rdd.query.proto.marshallers

List of protostream marshallers classes for the objects in the cache

Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query.

34.3.2. Methods to Configure the Version 2 Connector

The following methods can be used to configure the version 2 connector. They are provided by the org.infinispan.spark.config.ConnectorConfiguration class.

Table 34.2. Methods to Configure the Version 2 Connector

Method NameDescriptionDefault Value

setServerList(String)

List of JBoss Data Grid nodes

localhost:11222

setCacheName(String)

The name of the cache that will back the RDD

default cache

setReadBatchSize(Integer)

Batch size (number of entries) when reading from the cache

10000

setWriteBatchSize(Integer)

Batch size (number of entries) when writing to the cache

500

setPartitions(Integer)

Numbers of partitions created per JBoss Data Grid server

2

addProtoFile(String name, String contents)

Map with protobuf file names and contents

Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query.

addMessageMarshaller(Class)

List of protostream marshallers classes for the objects in the cache

Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query.

addProtoAnnotatedClass(Class)

Registers a Class containing protobuf annotations

Alternative to using addProtoFile and addMessageMarshaller methods, since both will be auto-generated based on the annotations.

setAutoRegisterProto()

Causes automatic registration of protobuf schemas in the server

None

addHotRodClientProperty(key, value)

Configures additional Hot Rod client properties when contacting the JBoss Data Grid Server

None

setTargetEntity(Class)

Used in conjunction with the Dataset API to specify the Query target

If omitted, and in case there is only one class annotated with protobuf configured, it will choose that class.

34.3.3. Connecting to a Secured JDG Cluster

If the JDG cluster is secured, Hot Rod must be configured with security for the Spark connector to work. There are several Hot Rod properties that can be set to configure Hot Rod security. They are described in the Hot Rod Properties table in the Appendix of the Administration and Configuration Guide, starting with infinispan.client.hotrod.use_ssl

Important

These properties are exclusive to the version 2 Apache Spark connector.

34.4. Code Examples for Spark 1.6

34.4.1. Code Examples for Spark 1.6

Since the connector for Apache Spark 1.6 uses a different configuration mechanism than the version 2 connector, and also because the version 2 connector supports some features 1.6 doesn’t, the code examples for each version are separated into their own sections. The following code examples work with version 1.6 of the Spark connector. Follow this link for code examples for Spark 2.

34.4.2. Creating and Using RDDs

With the Apache Spark 1.6 connector, RDDs are created by specifying a Properties instance with configurations described in Properties to Configure the Version 1.6 Connector, and then using it together with the Spark context to create a InfinispanRDD that is used with the normal Spark operations.

An example of this is below in both Java and Scala:

34.4.3. Creating an RDD

Creating an RDD (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import java.util.Properties;
[...]
// Begin by defining a new Spark configuration and creating a Spark context from this.
SparkConf conf = new SparkConf().setAppName("example-RDD");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Create the Properties instance, containing the JBoss Data Grid node and cache name.
Properties properties = new Properties();
properties.put("infinispan.client.hotrod.server_list", "server:11222");
properties.put("infinispan.rdd.cacheName","exampleCache");

// Create the RDD
JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties);

JavaRDD<Book> booksRDD = exampleRDD.values();

Creating an RDD (Scala)

import java.util.Properties

import org.apache.spark.{SparkConf, SparkContext}
import org.infinispan.spark.rdd.InfinispanRDD
import org.infinispan.spark._

// Begin by defining a new Spark configuration and creating a Spark context from this.
val conf = new SparkConf().setAppName("example-RDD-scala")
val sc = new SparkContext(conf)

// Create the Properties instance, containing the JBoss Data Grid node and cache name.
val properties = new Properties
properties.put("infinispan.client.hotrod.server_list", "server:11222")
properties.put("infinispan.rdd.cacheName", "exampleCache")

// Create an RDD from the DataGrid cache
val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties)

val booksRDD = exampleRDD.values

34.4.4. Querying an RDD

Once the RDD is available entries in the backing cache may be obtained by using either the Spark RDD operations or Spark’s SQL support. The above example is expanded to count the entries, per author, in the resulting RDD with an SQL query:

Querying an RDD (Java)

// The following imports should be added to the list from the previous example
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
[...]
// Continuing the previous example

// Create a SQLContext, registering the data frame and table
SQLContext sqlContext = new SQLContext(jsc);
DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class);
dataFrame.registerTempTable("books");

// Run the Query and collect results
List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();

Querying an RDD (Scala)

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
[...]
// Create a SQLContext, register a data frame and table
val sqlContext = new SQLContext(sc)
val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book])
dataFrame.registerTempTable("books")

 // Run the Query and collect the results
val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()

34.4.5. Writing an RDD to the Cache

Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write() method. This will copy the contents of the RDD to the cache:

Writing an RDD (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.spark.domain.Address;
import org.infinispan.spark.domain.Person;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import scala.Tuple2;
import java.util.List;
import java.util.Properties;
[...]
// Define the location of the JBoss Data Grid node
Properties properties = new Properties();
properties.put("infinispan.client.hotrod.server_list", "localhost:11222");
properties.put("infinispan.rdd.cacheName","exampleCache");

// Create the JavaSparkContext
SparkConf conf = new SparkConf().setAppName("write-example-RDD");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Defining two entries to be stored in a RDD
// Each Book will contain the title, author, and publicationYear
Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015");
Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014");

List<Tuple2<Integer, Book>> pairs = Arrays.asList(
    new Tuple2<>(1, bookOne),
    new Tuple2<>(2, bookTwo)
);

// Create the RDD using the newly created List
JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs);

// Write the entries into the cache
InfinispanJavaRDD.write(pairsRDD, config);

Writing an RDD (Scala)

import java.util.Properties
import org.infinispan.spark._
import org.infinispan.spark.rdd.InfinispanRDD
[...]
// Define the location of the JBoss Data Grid node
val properties = new Properties
properties.put("infinispan.client.hotrod.server_list", "localhost:11222")
properties.put("infinispan.rdd.cacheName", "exampleCache")

// Create the SparkContext
val conf = new SparkConf().setAppName("write-example-RDD-scala")
val sc = new SparkContext(conf)

// Create an RDD of Books
val bookOne = new Book("Linux Bible", "Negus, Chris", "2015")
val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014")

val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo))
val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap)

// Write the Key/Value RDD to the Data Grid
pairsRDD.writeToInfinispan(properties)

34.4.5.1. Creating and Using DStreams

DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.

To create a DStream a StreamingContext will be passed in along with StorageLevel and the JBoss Data Grid RDD configuration, as seen in the below example:

Creating a DStream (Scala)

import org.infinispan.spark.stream._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import java.util.Properties

// Spark context
val sc = ...
// java.util.Properties with Infinispan RDD configuration
val props = ...
val ssc = new StreamingContext(sc, Seconds(1))

val stream = new InfinispanInputDStream[String, Book](ssc, StorageLevel.MEMORY_ONLY, props)

The InfinispanInputDStream can be transformed using the many Spark’s DStream operations, and the processing will occur after calling "start" in the StreamingContext. For example, to display every 10 seconds the number of books inserted in the cache in the last 30 seconds:

Processing a DStream (Scala)

import org.infinispan.spark.stream._

val stream = ... // From previous sample

// Filter only created entries
val createdBooksRDD = stream.filter { case (_, _, t) => t == Type.CLIENT_CACHE_ENTRY_CREATED }

// Reduce last 30 seconds of data, every 10 seconds
val windowedRDD: DStream[Long] = createdBooksRDD.count().reduceByWindow(_ + _, Seconds(30), Seconds(10))

// Prints the results, couting the number of occurences in each individual RDD
windowedRDD.foreachRDD { rdd => println(rdd.reduce(_ + _)) }

// Start the processing
ssc.start()
ssc.awaitTermination()

Writing to JBoss Data Grid with DStreams

Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan() Java method or in Scala using the implicit writeToInfinispan(properties) method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream

34.4.6. Using the Infinispan Query DSL with Spark

The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.

Important

Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.

Consider the following example which retrieves a list of books that includes any author whose name contains Doe:

34.4.7. Filtering by a Query

Filtering by a Query (Scala)

import org.infinispan.client.hotrod.impl.query.RemoteQuery
import org.infinispan.client.hotrod.{RemoteCacheManager, Search}
import org.infinispan.spark.domain._
[...]
val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName))
    .from(classOf[Book])
    .having("author").like("Doe")
    .toBuilder[RemoteQuery].build()

val rdd = createInfinispanRDD[Int, Book]
    .filterByQuery[Book]](query, classOf[Book])

Projections are also fully supported; for instance, the above example may be adjusted to only obtain the title and publication year, and sorting on the latter field:

34.4.8. Filtering with a Projection

Filtering with a Projection (Scala)

import org.infinispan.client.hotrod.impl.query.RemoteQuery
import org.infinispan.client.hotrod.{RemoteCacheManager, Search}
import org.infinispan.spark.domain._
[...]
val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName))
    .select("title","publicationYear")
    .from(classOf[Book])
    .having("author").like("Doe")
    .groupBy("publicationYear")
    .toBuilder[RemoteQuery].build()

val rdd = createInfinispanRDD[Int, Book]
    .filterByQuery[Array[AnyRef]](query, classOf[Book])

In addition, if a filter has already been deployed to the JBoss Data Grid server it may be referenced by name, as seen below:

34.4.9. Filtering with a Deployed Filter

Filtering with a Deployed Filter (Scala)

val rdd = InfinispanRDD[String,Book] = ....
// "my-filter-factory" filter and converts Book to a String, and has two parameters
val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")

34.5. Code Examples for Spark 2

34.5.1. Code Examples for Spark 2

Since the connector for Apache Spark 2 uses a different configuration mechanism than the 1.6 connector, and also because the version 2 connector supports some features 1.6 doesn’t, the code examples for each version are separated into their own sections. The following code examples work with version 2 of the Spark connector. Follow this link for code examples for Spark 1.6.

34.5.2. Creating and Using RDDs

In Apache Spark 2, Resilient Distributed Datasets (RDDs) are created by specifying a ConnectorConfiguration instance with configurations described in the table from Methods to Configure the Version 2 Connector.

Examples of this in both Java and Scala are below:

34.5.3. Creating an RDD

Creating an RDD (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration config = new ConnectorConfiguration()
            .setCacheName("exampleCache").setServerList("server:11222");

JavaPairRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, config);

JavaRDD<MyEntity> entitiesRDD = infinispanRDD.values();

Creating an RDD (Scala)

import org.apache.spark.SparkContext
import org.infinispan.spark.config.ConnectorConfiguration
import org.infinispan.spark.rdd.InfinispanRDD

val sc: SparkContext = new SparkContext()

val config = new ConnectorConfiguration().setCacheName("my-cache").setServerList("10.9.0.8:11222")

val infinispanRDD = new InfinispanRDD[String, MyEntity](sc, config)

val entitiesRDD = infinispanRDD.values

34.5.4. Querying an RDD

34.5.4.1. SparkSQL Queries

Using SparkSQL Queries (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration conf = new ConnectorConfiguration();

// Obtain the values from an InfinispanRDD
JavaPairRDD<Long, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

JavaRDD<MyEntity> valuesRDD = infinispanRDD.values();

// Create a DataFrame from a SparkSession
SparkSession sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate();
Dataset<Row> dataFrame = sparkSession.createDataFrame(valuesRDD, MyEntity.class);

// Create a view
dataFrame.createOrReplaceTempView("myEntities");

// Create and run the Query
Dataset<Row> rows = sparkSession.sql("SELECT field1, count(*) as c from myEntities WHERE field1 != 'N/A' GROUP BY field1 ORDER BY c desc");

Using SparkSQL Queries (Scala)

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.infinispan.spark.config.ConnectorConfiguration
import org.infinispan.spark.rdd._

val sc: SparkContext = // Initialize your SparkContext here

val config = new ConnectorConfiguration().setServerList("myserver1:port,myserver2:port")

// Obtain the values from an InfinispanRDD
val infinispanRDD = new InfinispanRDD[Long, MyEntity](sc, config)
val valuesRDD = infinispanRDD.values

// Create a DataFrame from a SparkSession
val sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate()
val dataFrame = sparkSession.createDataFrame(valuesRDD, classOf[MyEntity])

// Create a view
dataFrame.createOrReplaceTempView("myEntities")

// Create and run the Query, collect and print results
sparkSession.sql("SELECT field1, count(*) as c from myEntities WHERE field1 != 'N/A' GROUP BY field1 ORDER BY c desc")
                  .collect().take(20).foreach(println)

34.5.5. Writing an RDD to the Cache

Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write() method. This will copy the contents of the RDD to the cache:

Writing an RDD (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration();

List<Integer> numbers = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
JavaPairRDD<Integer, Long> numbersRDD = jsc.parallelize(numbers).zipWithIndex();

InfinispanJavaRDD.write(numbersRDD, connectorConfiguration);

Writing an RDD (Scala)

import org.apache.spark.SparkContext
import org.infinispan.spark._
import org.infinispan.spark.config.ConnectorConfiguration

val config: ConnectorConfiguration = // Initialize your ConnectorConfiguration here
val sc: SparkContext = // Initialize your SparkContext here

val simpleRdd = sc.parallelize(1 to 1000).zipWithIndex()
simpleRdd.writeToInfinispan(config)

34.5.6. Creating DStreams

DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.

To create a DStream a StreamingContext will be passed in along with StorageLevel and the JBoss Data Grid RDD configuration, as seen in the below example:

Creating a DStream (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.stream.InfinispanJavaDStream;
import static org.apache.spark.storage.StorageLevel.MEMORY_ONLY;

SparkConf conf = new SparkConf().setAppName("my-stream-app");

ConnectorConfiguration configuration = new ConnectorConfiguration();

JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(1));

InfinispanJavaDStream.createInfinispanInputDStream(jsc, MEMORY_ONLY(), configuration);

Creating a DStream (Scala)

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.infinispan.spark.config.ConnectorConfiguration
import org.infinispan.spark.stream._

val sc = new SparkContext()
val config = new ConnectorConfiguration()
val ssc = new StreamingContext(sc, Seconds(1))
val stream = new InfinispanInputDStream[String, MyEntity](ssc, StorageLevel.MEMORY_ONLY, config)

Writing to JBoss Data Grid with DStreams

Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan() Java method or in Scala using the implicit writeToInfinispan(properties) method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream.

34.5.7. Using The Apache Spark Dataset API

In addition to the Resilient Distributed Dataset (RDD) programming interface, JBoss Data Grid includes the Apache Spark Dataset API, with support for pushing down predicates, similar to rdd.filterByQuery.

Dataset API Example (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.infinispan.spark.config.ConnectorConfiguration;

import java.util.List;

// Configure the connector using the ConnectorConfiguration: register entities annotated with Protobuf,
// and turn on automatic registration of schemas
ConnectorConfiguration connectorConfig = new ConnectorConfiguration()
         .setServerList("server1:11222,server2:11222")
         .addProtoAnnotatedClass(User.class)
         .setAutoRegisterProto();

// Create the SparkSession
SparkSession sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate();

// Load the "infinispan" datasource into a DataFame, using the infinispan config
Dataset<Row> df = sparkSession.read().format("infinispan").options(connectorConfig.toStringsMap()).load();

// From here it's possible to query using the DatasetSample API...
List<Row> rows = df.filter(df.col("age").gt(30)).filter(df.col("age").lt(40)).collectAsList();

// ... or execute SQL queries
df.createOrReplaceTempView("user");
String query = "SELECT first(r.name) as name, first(r.age) as age FROM user u GROUP BY r.age";
List<Row> results = sparkSession.sql(query).collectAsList();

Dataset API Example (Scala)

import org.apache.spark._
import org.apache.spark.sql._
import org.infinispan.protostream.annotations.{ProtoField, ProtoMessage}
import org.infinispan.spark.config.ConnectorConfiguration

import scala.annotation.meta.beanGetter
import scala.beans.BeanProperty


  // Entities can be annotated in order to automatically generate protobuf schemas.
  // Also, they should be valid java beans. From Scala this can be achieved as:

@ProtoMessage(name = "user")
class User(@(ProtoField@beanGetter)(number = 1, required = true) @BeanProperty var name: String,
           @(ProtoField@beanGetter)(number = 2, required = true) @BeanProperty var age: Int) {
   def this() = {
        this(name = "", age = -1)
   }
}

// Configure the connector using the ConnectorConfiguration: register entities annotated with Protobuf,
// and turn on automatic registration of schemas
val infinispanConfig: ConnectorConfiguration = new ConnectorConfiguration()
      .setServerList("server1:11222,server2:11222")
      .addProtoAnnotatedClass(classOf[User])
      .setAutoRegisterProto()

// Create the SparkSession
val sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate()

// Load the "infinispan" datasource into a DataFame, using the infinispan config
val df = sparkSession.read.format("infinispan").options(infinispanConfig.toStringsMap).load()

// From here it's possible to query using the DatasetSample API...
val rows: Array[Row] = df.filter(df("age").gt(30)).filter(df("age").lt(40)).collect()

// ... or execute SQL queries
df.createOrReplaceTempView("user")
val query = "SELECT first(r.name) as name, first(r.age) as age FROM user u GROUP BY r.age"
val rowsFromSQL: Array[Row] = sparkSession.sql(query).collect()

34.5.8. Using the Infinispan Query DSL with Spark

The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.

Important

Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.

34.5.9. Filtering with a pre-built Query Object

Filtering with a pre-built Query Object (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.Search;
import org.infinispan.query.dsl.Query;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration conf = new ConnectorConfiguration();

InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

RemoteCache<String, MyEntity> remoteCache = new RemoteCacheManager().getCache();

// Assuming MyEntity is already stored in the cache with protobuf encoding, and has protobuf annotations.
Query query = Search.getQueryFactory(remoteCache).from(MyEntity.class).having("field").equal("value").build();

JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByQuery(query);

Filtering with a pre-built Query Object (Scala)

import org.infinispan.client.hotrod.{RemoteCache, Search}
import org.infinispan.spark.rdd.InfinispanRDD

val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here
val cache: RemoteCache[String, MyEntity] = // Initalize your RemoteCache here

// Assuming MyEntity is already stored in the cache with protobuf encoding, and has protobuf annotations.
val query = Search.getQueryFactory(cache).from(classOf[MyEntity]).having("field").equal("value").build()

val filteredRDD = rdd.filterByQuery(query)

34.5.10. Filtering with an Ickle Query

Filtering with an Ickle Query (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();
ConnectorConfiguration conf = new ConnectorConfiguration();

InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByQuery("From myEntity where field = 'value'");

Filtering with an Ickle Query (Scala)

import org.infinispan.spark.rdd.InfinispanRDD

val rdd: InfinispanRDD[String, MyEntity] = // Initialize your InfinispanRDD here
val filteredRDD = rdd.filterByQuery("FROM MyEntity e where e.field BETWEEN 10 AND 20")

34.5.11. Filtering on the Server

Filtering on the Server (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.rdd.InfinispanJavaRDD;

JavaSparkContext jsc = new JavaSparkContext();

ConnectorConfiguration conf = new ConnectorConfiguration();

InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf);

JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByCustom("my-filter", "param1", "param2");

Filtering on the Server (Scala)

import org.infinispan.spark.rdd.InfinispanRDD

val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here
// "my-filter-factory" filter and converts MyEntity to a Double, and has two parameters
val filteredRDD = rdd.filterByCustom[Double]("my-filter-factory", "param1", "param2")

34.6. Spark Performance Considerations

The Data Grid Spark connector creates by default two partitions per each Data Grid node, each partition specifies a subset of the data in that particular node.

Those partitions are then sent to the Spark workers that will process them in parallel. If the number of Spark workers is less than the number of Data Grid nodes, some delay can occur since each worker has a maximum capacity of executing tasks in parallel. For this reason it is recommended to have at least the same number of Spark workers as Data Grid nodes to take advantage of the parallelism.

In addition, if a Spark worker is co-located in the same node as the Data Grid node, the connector will distribute tasks so that each worker only processes data found in the local Data Grid node.