Chapter 14. MapReduce

The Red Hat JBoss Data Grid MapReduce model is an adaptation of Google's MapReduce model.
MapReduce is a programming model used to process and generate large data sets. It is typically used in distributed computing environments where nodes are clustered. In JBoss Data Grid, MapReduce allows transparent distributed processing of large amounts of data across the grid. It does this by performing computations locally where the data is stored whenever possible.
MapReduce uses the two distinct computational phases of map and reduce to process information requests through the data grid. The process occurs as follows:
  1. The user initiates a task on a cache instance, which runs on a cluster node (the master node).
  2. The master node receives the task input, divides the task, and sends tasks for map phase execution on the grid.
  3. Each node executes a Mapper function on its input, and returns intermediate results back to the master node.
    • If the useIntermediateSharedCache parameter is set to "true", the map results are inserted in an intermediary cache, rather than being returned to the master node.
    • If a Combiner has been specified with task.combinedWith(combiner), the Combiner is called on the Mapper results and the combiner's results are returned to the master node or inserted in the intermediary cache.

      Note

      Combiners are not required but can only be used when the function is both commutative (changing the order of the operands does not change the results) and associative (the order in which the operations are performed does not matter as long as the sequence of the operands is not changed). Combiners are advantageous to use because they can improve the speeds of MapReduceTask executions.
  4. The master node collects all intermediate results from the map phase and merges all intermediate values associated with the same intermediate key.
    • If the distributedReducePhase parameter is set to true, the merging of the intermediate values is done on each node, as the Mapper or Combiner results are inserted in the intermediary cache.The master node only receives the intermediate keys.
  5. The master node sends intermediate key/value pairs for reduction on the grid.
    • If the distributedReducePhase parameter is set to "false", the reduction phase is executed only on the master node.
  6. The final results of the reduction phase are returned. Optionally specify the target cache for the results using the instructions in Section 14.1.2, “Specify the Target Cache”.
    • If the distributedReducePhase parameter is set to "true", the master node running the task receives all results from the reduction phase and returns the final result to the MapReduce task initiator.
    • If no target cache is specified and no collator is specified (using task.execute(Collator)), the result map is returned to the master node.

14.1. The MapReduce API

In Red Hat JBoss Data Grid, each MapReduce task has five main components:
  • Mapper
  • Reducer
  • Collator
  • MapReduceTask
  • Combiners
The Mapper class implementation is a component of MapReduceTask, which is invoked once per input cache entry key/value pair. Map is a process of applying a given function to each element of a list, returning a list of results.
Each node in the JBoss Data Grid executes the Mapper on a given cache entry key/value input pair. It then transforms this cache entry key/value pair into an intermediate key/value pair, which is emitted into the provided Collector instance.

Note

The MapReduceTask requires a Mapper and a Reducer but using a Collator or Combiner is optional.

Example 14.1. Executing the Mapper

public interface Mapper<KIn, VIn, KOut, VOut> extends Serializable {
 
   /**
    * Invoked once for each input cache entry KIn,VOut pair.
    */
   void map(KIn key, VIn value, Collector<KOut, VOut> collector);
At this stage, for each output key there may be multiple output values. The multiple values must be reduced to a single value, and this is the task of the Reducer. JBoss Data Grid's distributed execution environment creates one instance of Reducer per execution node.

Example 14.2. Reducer

public interface Reducer<KOut, VOut> extends Serializable {
 
   /**
    * Combines/reduces all intermediate values for a particular intermediate key to a single value.
    * <p>
    *
    */
   VOut reduce(KOut reducedKey, Iterator<VOut> iter);
 
}
The same Reducer interface is used for Combiners. A Combiner is similar to a Reducer, except that it must be able to work on partial results. The Combiner is executed on the results of the Mapper, on the same node, without considering the other nodes that might have generated values for the same intermediate key.

Note

Combiners are not required but can only be used when the function is both commutative (changing the order of the operands does not change the results) and associative (the order in which the operations are performed does not matter as long as the sequence of the operands is not changed). Combiners are advantageous to use because they can improve the speeds of MapReduceTask executions.
As Combiners only see a part of the intermediate values, they cannot be used in all scenarios, however when used they can reduce network traffic and memory consumption in the intermediate cache significantly.
The Collator coordinates results from Reducers that have been executed on JBoss Data Grid, and assembles a final result that is delivered to the initiator of the MapReduceTask. The Collator is applied to the final map key/value result of MapReduceTask.

Example 14.3. Assembling the Result

public interface Collator<KOut, VOut, R> {
   /**
    * Collates all reduced results and returns R to invoker of distributed task. 
    * 
    * @return final result of distributed task computation
    */
   R collate(Map<KOut, VOut> reducedResults);

14.1.1. MapReduceTask

In Red Hat JBoss Data Grid, MapReduceTask is a distributed task, which unifies the Mapper, Combiner, Reducer, and Collator components into a cohesive computation, which can be parallelized and executed across a large-scale cluster.
These components can be specified with a fluent API. However,as most of them are serialized and executed on other nodes, using inner classes is not recommended.

Example 14.4. Specifying MapReduceTask Components

new MapReduceTask(cache)
      .mappedWith(new MyMapper())
      .combinedWith(new MyCombiner())
      .reducedWith(new MyReducer())
      .execute(new MyCollator());
MapReduceTask requires a cache containing data that will be used as input for the task. The JBoss Data Grid execution environment will instantiate and migrate instances of provided Mappers and Reducers seamlessly across the nodes.
By default, all available key/value pairs of a specified cache will be used as input data for the task. This can be modified by using the onKeys method as an input key filter.
There are two MapReduceTask constructor parameters that determine how the intermediate values are processed:
  • distributedReducePhase - When set to false, the default setting, the reducers are only executed on the master node. If set to true, the reducers are executed on every node in the cluster.
  • useIntermediateSharedCache - Only important if distributedReducePhase is set to true. If true, which is the default setting, this task will share intermediate value cache with other executing MapReduceTasks on the grid. If set to false, this task will use its own dedicated cache for intermediate values.

Note

The default timeout for MapReduceTask is 0 (zero). That is, the task will wait indefinitely for its completion by default.

14.1.2. Specify the Target Cache

Red Hat JBoss Data Grid's MapReduce implementation allows users to specify a target cache to store the results of an executed task. The results are available after the execute method (which is synchronous) is complete.
This variant of the execute method prevents the master JVM node from exceeding its allows maximum heap size. This is especially relevant if objects that are the results of the reduce phase have a large memory footprint or if multiple MapReduceTasks are concurrently executing on the master task node.
Use the following method of MapReduceTask to specify a Cache object to store the results:
public void execute(Cache<KOut, VOut> resultsCache) throws CacheException
Use the following method of MapReduceTask to specify a name for the target cache:
public void execute(String resultsCache) throws CacheException

14.1.3. Mapper and CDI

The Mapper is invoked with appropriate input key/value pairs on an executing node, however Red Hat JBoss Data Grid also provides a CDI injection for an input cache. The CDI injection can be used where additional data from the input cache is required in order to complete map transformation.
When the Mapper is executed on a JBoss Data Grid executing node, the JBoss Data Grid CDI module provides an appropriate cache reference, which is injected to the executing Mapper. To use the JBoss Data Grid CDI module with Mapper:
  1. Declare a cache field in Mapper.
  2. Annotate the cache field Mapper with @org.infinispan.cdi.Input.
  3. Annotate with mandatory @Inject annotation.

Example 14.5. Using a CDI Injection

public class WordCountCacheInjecterMapper implements Mapper<String, String, String, Integer> {

   @Inject
   @Input
   private Cache<String, String> cache;

   @Override
   public void map(String key, String value, Collector<String, Integer> collector) {

      //use injected cache if needed
      StringTokenizer tokens = new StringTokenizer(value);
      while (tokens.hasMoreElements()) {
         for(String token : value.split("\\w")) {
            collector.emit(token, 1);
         }
      }
   }
}