30.4. Distributed Streams

Distributed Streams work similarly to map reduce; however, with Distributed Streams there are zero to many intermediate operations followed by a single terminal operation that is sent to each node where work is performed. The following steps are used for this behavior:
  1. The desired segments are grouped by which node is the primary owner of each given segment.
  2. A request is generated for each remote node. This request contains the intermediate and terminal operations, along with the segments to process.
    • The thread where the terminal operation was initiated will perform the local operation directly.
    • Each remote node will receive the generated request, run the operations on a remote thread, and then send the response back.
  3. Once all requests complete the user thread will gather all responses and perform any reductions specified by the operations.
  4. The final response is returned to the user.

30.4.1. Marshallability

When using distributed or replicated caches the keys and values must be marshallable; in addition, operations executed on Distributed Streams must also be marshallable, as these operations are sent to the other nodes in the cluster. This is most commonly accomplished by using a new class that is either Serializable or has an Externalizer registered; however, as the FunctionalInterface implements Serializable all lambdas are instantly serialized and thus no additional cast is required.

Note

Intermediate values in distributed streams do not need to be marshallable; only the final value sent back, typically the terminal operation, must be marshallable.
If a lambda function is in use this may be serialized by casting the parameter as an instance of Serializable. For instance, consider a cache that stores Book entries; the following would create a collection of Book instances that match a specific author:
List<Book> books = cache.keySet().stream()
    .filter((Predicate<? super Book>) e -> e.getAuthor().equals("authorname"))
    .collect(toList());
Additionally, not all produced Collectors are marshallable by default. JBoss Data Grid has included org.infinispan.stream.CacheCollectors as a convenient way to utilize any combination of Collectors that function properly when marshalling is required.

30.4.2. Parallelism

There are two different methods to parallelize streams:
  • Parallel Streams - causing each operation to be executed in parallel on a single node
  • Parallel Distribution - parallelizing the request so that it involves multiple nodes
By default, Distributed Streams enable parallel distribution; however, this may be further coupled with a parallel Stream, allowing concurrent operations executing across multiple nodes, with multiple threads on each node.
To mark a Stream as parallel it may either be obtained with parallelStream(), or it may be enabled after obtaining the Stream by invoking parallel(). The following example shows both methods:
// Obtain a parallel Stream initially
List<Book> books = cache.keySet().parallelStream()
    [...]
    
// Create the initial stream and then invoke parallel
List<Book> books = cache.keySet().stream()
    .parallel()
    [...]

Note

Some operations, such as rehash aware iterator or forEach operations, have a sequential stream forced locally. Using parallel streams on these operations is not possible at this time.

30.4.3. Distributed Operators

30.4.3.1. Terminal Operator Distributed Result Reductions

Below each terminal operator is discussed, along with how the distributed reduction works for each one.
  • allMatch
    This operator is run on each node and then all results are combined using a logical AND operation locally to obtain the final value. If a normal stream operation returns early then these methods will complete early as well.
  • noneMatch anyMatch
    These operators are run on each node and then all results are combined using a logical OR operation locally to obtain the final value. If a normal stream operation returns early then these methods will complete early as well.
  • collect
    The collect method can perform a few extra steps. Similar to other methods the remote node will perform everything as expected; however, instead of performing the final finisher operator it sends back the fully combined results. The local thread will then combine all local and remote results into a value which then performs the finisher operator. In addition, the final value does not need to be serializable, but the values produced from the supplier and combiner methods must be serialized.
  • count
    The count method simply adds the numbers received from each node.
  • findAny findFirst
    The findAny method will return the first value found, regardless if it was from a remote or local node. This operation supports early termination, as once an initial value has been found no others will be processed. The findFirst method behaves similarly, but requires a sorted intermediate operation which is described in Section 30.4.3.3, “Intermediate Operation Exceptions”.
  • max min
    The max and min methods find the respective value on each node before a final reduction is performed locally to determine the true max or min across all nodes.
  • reduce
    The various reduce methods seralize the result as much as possible before accumulating the local and remote results together locally, combining if enabled. Due to this behavior a value returned from the combiner does not need to be serializable.

30.4.3.2. Key Based Rehash Aware Operators

Unlike the other terminal operators each of the following operators require a special type of rehash awareness to keep track of which keys per segment have been processed. This guarantees each key will be processed exactly once, for iterator and spliterator operators, or at least once, for forEach, even if cluster membership changes.
  • iterator spliterator
    These operators return batches of entries when run on a remote node, where the next batch is only sent after the previous is fully consumed. This behavior is to limit how many entries are retained in memory at any given time. The user node will keep track of which keys have been processed, and once a segment has completed those keys will be released from memory. Because of this behavior it is preferable to use sequential processing, allowing only a subset of segment keys to be held in memory instead of having keys from all nodes retained.
  • forEach
    While forEach returns batches it only returns a batch after it has finished processing at least a batch worth of keys. This way the originating node knows which keys have been processed already, which reduces the possibility of processing the same entry again; however, it is possible to have the same set processed repeatedly if a node goes down unexpectedly. In this case the node could have been processing an uncompleted batch when it went down, resulting in the same batch to be ran again when the rehash failure operation occurs. Adding a node will not cause this issue, as the rehash failover does not occur until all responses are received.
The operations' batch sizes are controlled by the same value, distributedBatchSize, on the CacheStream. If no value is set then it will default to the chunkSize configured in state transfer. While larger values will allow for larger batches, resulting in fewer returns, this results in increased memory usage, and testing should be performed to determine an appropriate size for each application.

30.4.3.3. Intermediate Operation Exceptions

The following intermediate operations have special exceptions. All of these methods have some sort of artificial iterator implanted in the stream processing to guarantee correctness, and due to this using any of the following may cause severe performance degradation.
  • Skip
    An artificial iterator is implanted up to the skip operation, and then results are brought locally so that the appropriate number of elements may be skipped.
  • Peek
    An artificial iterator is implanted up to the peek operation. Only up to a number of peeked elements are returned to a remote node, and then results are brought locally so that it may peek at only the desired amount.
  • Sorted
    An artificial iterator is implanted up to the sorted operation, and then all results are locally sorted.

    Warning

    This operation requires having all entries in memory on the local node.
  • Distinct
    Distinct is performed on each remote node and then an artificial iterator returns those distinct values, before all of those results have a distinct operation performed upon them.

    Warning

    This operation requires having all entries in memory on the local node.

30.4.4. Distributed Stream Examples

A classic example of Map/Reduce is word count. Assuming we have a cache with String for keys and values, and we need to count the occurrence of all words in all sentences, this could be implemented using the following:
Map<String, Integer> wordCountMap = cache.entrySet().parallelStream()
    .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));
If we wanted to revise the example to find the most frequent word we would need to have all words available and counted locally first. The following snippet extends our previous example to perform this search:
String mostFrequentWord = cache.entrySet().parallelStream()
    .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.collectingAndThen(
        Collectors.groupingBy(Function.identity(), Collectors.counting()),
            wordCountMap -> {
                String mostFrequent = null;
                long maxCount = 0;
                for (Map.Entry<String, Long> e : wordCountMap.entrySet()) {
                    int count = e.getValue().intValue();
                    if (count > maxCount) {
                        maxCount = count;
                        mostFrequent = e.getKey();
                    }
                }
                return mostFrequent;
            })));
At present, this last step will be executed in a single thread. We can further optimize this operation by using a parallel stream locally to perform the final operation:
Map<String, Long> wordCount = cache.entrySet().parallelStream()
    .map((Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));
Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream()
    .reduce((e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);