Chapter 35. Streams

35.1. Streams

Streams were introduced in Java 8, allowing an efficient way of performing operations on very large data sets, including the entirety of a given cache. These operations are performed on collections instead of procedurally iterating over the entire dataset.

In addition, if the cache is distributed then operations may be performed even more efficiently as these may be performed across the cluster concurrently.

A Stream may be obtained by invoking the stream(), for a single-threaded stream, or parallelStream(), for a multi-threaded stream, methods on a given Map. Parallel streams are discussed in more detail at Parallelism.

35.2. Using Streams on a Local/Invalidation/Replication Cache

A stream used with a local, invalidation, or replication cache can be used identical to a stream on a regular collection.

For example, consider a cache that contains a number of Books. To create a Map that contains all entries with JBoss in the title the following could be used:

Map<Object, String> jbossBooks = cache.entrySet().stream()
    .filter(e -> e.getValue().getTitle().contains("JBoss"))
    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

35.3. Using Streams with a Distribution Cache

When a Stream operation is performed on a distribution cache it will send the intermediate and terminal operations to each node, and then the resulting data will be sent back to the originating node. This behavior allows operations to be performed on the remote nodes and only the end results returned, resulting in much better performance as the intermediate values are not returned.

Rehash Aware

Once the stream has been created the data will be segmented so that each node will only perform operations upon the data that it owns as the primary owner. Assuming the segments are granular enough to provide an even distribution of data per node, this allows for even processing of data across the segments.

This process can be volatile if new nodes are added or old nodes leave the cluster, as the data is redistributed between nodes. This may cause issues where data can be processed a second time; however, Distributed Streams handles the redistribution of data automatically so that manual monitoring of nodes does not need to occur.

35.4. Setting Timeouts

It is possible to configure a timeout value for the operation request; this is only used for remote requests and is configured per request. This means that local requests will never timeout, and if a failover occurs then each subsequent request will have a new timeout.

If no timeout is specified then the replication timeout will be used by default. This may be manually configured by using the timeout(long timeout, TimeUnit unit) method of the stream, as seen in the following example:

CacheStream<Object, String> stream = cache.entrySet().stream();
stream.timeout(1, TimeUnit.MINUTES);

35.5. Distributed Streams

35.5.1. Distributed Streams

Distributed Streams work similarly to map reduce; however, with Distributed Streams there are zero to many intermediate operations followed by a single terminal operation that is sent to each node where work is performed. The following steps are used for this behavior:

  1. The desired segments are grouped by which node is the primary owner of each given segment.
  2. A request is generated for each remote node. This request contains the intermediate and terminal operations, along with the segments to process.

    • The thread where the terminal operation was initiated will perform the local operation directly.
    • Each remote node will receive the generated request, run the operations on a remote thread, and then send the response back.
  3. Once all requests complete the user thread will gather all responses and perform any reductions specified by the operations.
  4. The final response is returned to the user.

35.5.2. Marshallability

When using distributed or replicated caches the keys and values must be marshallable; in addition, operations executed on Distributed Streams must also be marshallable, as these operations are sent to the other nodes in the cluster. This is most commonly accomplished by using a new class that is either Serializable or has an Externalizer registered; however, as the FunctionalInterface implements Serializable all lambdas are instantly serialized and thus no additional cast is required.

Note

Intermediate values in distributed streams do not need to be marshallable; only the final value sent back, typically the terminal operation, must be marshallable.

If a lambda function is in use this may be serialized by casting the parameter as an instance of Serializable. For instance, consider a cache that stores Book entries; the following would create a collection of Book instances that match a specific author:

List<Book> books = cache.keySet().stream()
    .filter((Predicate<? super Book>) e -> e.getAuthor().equals("authorname"))
    .collect(toList());

Additionally, not all produced Collectors are marshallable by default. JBoss Data Grid has included org.infinispan.stream.CacheCollectors as a convenient way to utilize any combination of Collectors that function properly when marshalling is required.

35.5.3. Parallelism

There are two different methods to parallelize streams:

  • Parallel Streams - causing each operation to be executed in parallel on a single node
  • Parallel Distribution - parallelizing the request so that it involves multiple nodes

By default, Distributed Streams enable parallel distribution; however, this may be further coupled with a parallel Stream, allowing concurrent operations executing across multiple nodes, with multiple threads on each node.

To mark a Stream as parallel it may either be obtained with parallelStream(), or it may be enabled after obtaining the Stream by invoking parallel(). The following example shows both methods:

// Obtain a parallel Stream initially
List<Book> books = cache.keySet().parallelStream()
    [...]

// Create the initial stream and then invoke parallel
List<Book> books = cache.keySet().stream()
    .parallel()
    [...]
Note

Some operations, such as rehash aware iterator or forEach operations, have a sequential stream forced locally. Using parallel streams on these operations is not possible at this time.

35.5.4. Distributed Operators

35.5.4.1. Terminal Operator Distributed Result Reductions

Below each terminal operator is discussed, along with how the distributed reduction works for each one.

  • allMatch

    This operator is run on each node and then all results are combined using a logical AND operation locally to obtain the final value. If a normal stream operation returns early then these methods will complete early as well.

  • noneMatch anyMatch

    These operators are run on each node and then all results are combined using a logical OR operation locally to obtain the final value. If a normal stream operation returns early then these methods will complete early as well.

  • collect

    The collect method can perform a few extra steps. Similar to other methods the remote node will perform everything as expected; however, instead of performing the final finisher operator it sends back the fully combined results. The local thread will then combine all local and remote results into a value which then performs the finisher operator. In addition, the final value does not need to be serializable, but the values produced from the supplier and combiner methods must be serialized.

  • count

    The count method simply adds the numbers received from each node.

  • findAny findFirst

    The findAny method will return the first value found, regardless if it was from a remote or local node. This operation supports early termination, as once an initial value has been found no others will be processed. The findFirst method behaves similarly, but requires a sorted intermediate operation which is described in Intermediate_Operation_Exceptions.

  • max min

    The max and min methods find the respective value on each node before a final reduction is performed locally to determine the true max or min across all nodes.

  • reduce

    The various reduce methods seralize the result as much as possible before accumulating the local and remote results together locally, combining if enabled. Due to this behavior a value returned from the combiner does not need to be serializable.

35.5.4.2. Key Based Rehash Aware Operators

Unlike the other terminal operators each of the following operators require a special type of rehash awareness to keep track of which keys per segment have been processed. This guarantees each key will be processed exactly once, for iterator and spliterator operators, or at least once, for forEach, even if cluster membership changes.

  • iterator spliterator

    These operators return batches of entries when run on a remote node, where the next batch is only sent after the previous is fully consumed. This behavior is to limit how many entries are retained in memory at any given time. The user node will keep track of which keys have been processed, and once a segment has completed those keys will be released from memory. Because of this behavior it is preferable to use sequential processing, allowing only a subset of segment keys to be held in memory instead of having keys from all nodes retained.

  • forEach

    While forEach returns batches it only returns a batch after it has finished processing at least a batch worth of keys. This way the originating node knows which keys have been processed already, which reduces the possibility of processing the same entry again; however, it is possible to have the same set processed repeatedly if a node goes down unexpectedly. In this case the node could have been processing an uncompleted batch when it went down, resulting in the same batch to be ran again when the rehash failure operation occurs. Adding a node will not cause this issue, as the rehash failover does not occur until all responses are received.

The operations' batch sizes are controlled by the same value, distributedBatchSize, on the CacheStream. If no value is set then it will default to the chunkSize configured in state transfer. While larger values will allow for larger batches, resulting in fewer returns, this results in increased memory usage, and testing should be performed to determine an appropriate size for each application.

35.5.4.3. Intermediate Operation Exceptions

The following intermediate operations have special exceptions. All of these methods have some sort of artificial iterator implanted in the stream processing to guarantee correctness, and due to this using any of the following may cause severe performance degradation.

  • Skip

    An artificial iterator is implanted up to the skip operation, and then results are brought locally so that the appropriate number of elements may be skipped.

  • Peek

    An artificial iterator is implanted up to the peek operation. Only up to a number of peeked elements are returned to a remote node, and then results are brought locally so that it may peek at only the desired amount.

  • Sorted

    An artificial iterator is implanted up to the sorted operation, and then all results are locally sorted.

    Warning

    This operation requires having all entries in memory on the local node.

  • Distinct

    Distinct is performed on each remote node and then an artificial iterator returns those distinct values, before all of those results have a distinct operation performed upon them.

    Warning

    This operation requires having all entries in memory on the local node.

35.5.5. Distributed Stream Examples

A classic example of Map/Reduce is word count. Assuming we have a cache with String for keys and values, and we need to count the occurrence of all words in all sentences, this could be implemented using the following:

Map<String, Integer> wordCountMap = cache.entrySet().parallelStream()
    .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));

If we wanted to revise the example to find the most frequent word we would need to have all words available and counted locally first. The following snippet extends our previous example to perform this search:

String mostFrequentWord = cache.entrySet().parallelStream()
    .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.collectingAndThen(
        Collectors.groupingBy(Function.identity(), Collectors.counting()),
            wordCountMap -> {
                String mostFrequent = null;
                long maxCount = 0;
                for (Map.Entry<String, Long> e : wordCountMap.entrySet()) {
                    int count = e.getValue().intValue();
                    if (count > maxCount) {
                        maxCount = count;
                        mostFrequent = e.getKey();
                    }
                }
                return mostFrequent;
            })));

At present, this last step will be executed in a single thread. We can further optimize this operation by using a parallel stream locally to perform the final operation:

Map<String, Long> wordCount = cache.entrySet().parallelStream()
    .map((Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));
Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream()
    .reduce((e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);