Chapter 34. Distributed Execution

34.1. Distributed Execution

Red Hat JBoss Data Grid provides distributed execution through a standard JDK ExecutorService interface. Tasks submitted for execution are executed on an entire cluster of JBoss Data Grid nodes, rather than being executed in a local JVM.

JBoss Data Grid’s distributed task executors can use data from JBoss Data Grid cache nodes as input for execution tasks. As a result, there is no need to configure the cache store for intermediate or final results. As input data in JBoss Data Grid is already load balanced, tasks are also automatically balanced, therefore there is no need to explicitly assign tasks to specific nodes.

In JBoss Data Grid’s distributed execution framework:

  • Each DistributedExecutorService is bound to a single cache. Tasks submitted have access to key/value pairs from that particular cache if the task submitted is an instance of DistributedCallable.
  • Every Callable, Runnable, and/or DistributedCallable submitted must be either Serializable or Externalizable, in order to prevent task migration to other nodes each time one of these tasks is performed. The value returned from a Callable must also be Serializable or Externalizable.

34.2. Distributed Executor Service

A DistributedExecutorService controls the execution of DistributedCallable, and other Callable and Runnable, classes on the cluster. These instances are tied to a specific cache that is passed in upon instantiation:

DistributedExecutorService des = new DefaultExecutorService(cache);

It is only possible to execute a DistributedTask against a subset of keys if DistributedCallable is extended, as discussed in DistributedCallableAPI. If a task is submitted in this manner to a single node, then JBoss Data Grid will locate the nodes containing the indicated keys, migrate the DistributedCallable to this node, and return a CompletableFuture. Alternatively, if a task is submitted to all available nodes in this manner then only the nodes containing the indicated keys will receive the task.

Once a DistributedTask has been created it may be submitted to the cluster using any of the below methods:

  • The task can be submitted to all available nodes and key/value pairs on the cluster using the submitEverywhere method:

    des.submitEverywhere(task)
  • The submitEverywhere method can also take a set of keys as an argument. Passing in keys in this manner will submit the task only to available nodes that contain the indicated keys:

    des.submitEverywhere(task, $KEY)
  • If a key is specified, then the task will be executed on a single node that contains at least one of the specified keys. Any keys not present locally will be retrieved from the cluster. This version of the submit method accepts one or more keys to be operated on, as seen in the following examples:

    des.submit(task, $KEY)
    des.submit(task, $KEY1, $KEY2, $KEY3)
  • A specific node can be instructed to execute the task by passing the node’s Address to the submit method. The below will only be executed on the cluster’s Coordinator:

    des.submit(cache.getCacheManager().getCoordinator(), task)
    Note

    By default tasks are automatically balanced, and there is typically no need to indicate a specific node to execute against.

34.3. DistributedCallable API

The DistributedCallable interface is a subtype of the existing Callable from java.util.concurrent.package , and can be executed in a remote JVM and receive input from Red Hat JBoss Data Grid. The DistributedCallable interface is used to facilitate tasks that require access to JBoss Data Grid cache data.

When using the DistributedCallable API to execute a task, the task’s main algorithm remains unchanged, however the input source is changed.

Users who have already implemented the Callable interface must extend DistributedCallable if access to the cache or the set of passed in keys is required.

Using the DistributedCallable API

public interface DistributedCallable<K, V, T> extends Callable<T> {

   /**
    * Invoked by execution environment after DistributedCallable
    * has been migrated for execution to a specific Infinispan node.
    *
    * @param cache
    *           cache whose keys are used as input data for this
    *           DistributedCallable task
    * @param inputKeys
    *           keys used as input for this DistributedCallable task
    */
   public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys);

}

34.4. Callable and CDI

Where DistributedCallable cannot be implemented or is not appropriate, and a reference to input cache used in DistributedExecutorService is still required, there is an option to inject the input cache by CDI mechanism.

When the Callable task arrives at a Red Hat JBoss Data Grid executing node, JBoss Data Grid’s CDI mechanism provides an appropriate cache reference, and injects it to the executing Callable.

To use the JBoss Data Grid CDI with Callable:

  1. Declare a Cache field in Callable and annotate it with org.infinispan.cdi.Input
  2. Include the mandatory @Inject annotation.

Using Callable and the CDI

public class CallableWithInjectedCache implements Callable<Integer>, Serializable {

      @Inject
      @Input
      private Cache<String, String> cache;


      @Override
      public Integer call() throws Exception {
        //use injected cache reference
        return 1;
      }
}

34.5. Distributed Task Failover

Red Hat JBoss Data Grid’s distributed execution framework supports task failover in the following cases:

  • Failover due to a node failure where a task is executing.
  • Failover due to a task failure; for example, if a Callable task throws an exception.

The failover policy is disabled by default, and Runnable, Callable, and DistributedCallable tasks fail without invoking any failover mechanism.

JBoss Data Grid provides a random node failover policy, which will attempt to execute a part of a Distributed task on another random node if one is available.

A random failover execution policy can be specified using the following as an example:

Random Failover Execution Policy

DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable());
taskBuilder.failoverPolicy(DefaultExecutorService.RANDOM_NODE_FAILOVER);
DistributedTask<Boolean> distributedTask = taskBuilder.build();
Future<Boolean> future = des.submit(distributedTask);
Boolean r = future.get();

The DistributedTaskFailoverPolicy interface can also be implemented to provide failover management.

Distributed Task Failover Policy Interface

/**
 * DistributedTaskFailoverPolicy allows pluggable fail over target selection for a failed remotely
 * executed distributed task.
 *
 */
public interface DistributedTaskFailoverPolicy {

   /**
    * As parts of distributively executed task can fail due to the task itself throwing an exception
    * or it can be an Infinispan system caused failure (e.g node failed or left cluster during task
    * execution etc).
    *
    * @param failoverContext
    *           the FailoverContext of the failed execution
    * @return result the Address of the Infinispan node selected for fail over execution
    */
   Address failover(FailoverContext context);

   /**
    * Maximum number of fail over attempts permitted by this DistributedTaskFailoverPolicy
    *
    * @return max number of fail over attempts
    */
   int maxFailoverAttempts();
}

34.6. Distributed Task Execution Policy

The DistributedTaskExecutionPolicy allows tasks to specify a custom execution policy across the Red Hat JBoss Data Grid cluster, by scoping execution of tasks to a subset of nodes.

For example, DistributedTaskExecutionPolicy can be used to manage task execution in the following cases:

  • where a task is to be exclusively executed on a local network site instead of a backup remote network center.
  • where only a dedicated subset of a certain JBoss Data Grid rack nodes are required for specific task execution.

Using Rack Nodes to Execute a Specific Task

DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable());
taskBuilder.executionPolicy(DistributedTaskExecutionPolicy.SAME_RACK);
DistributedTask<Boolean> distributedTask = taskBuilder.build();
Future<Boolean> future = des.submit(distributedTask);
Boolean r = future.get();

34.7. Distributed Execution and Locality

In a Distributed Environment ownership, in regards to the DistributionManager and ConsistentHash, is theoretical; neither of these classes have any knowledge if data is actively in the cache. Instead, these classes are used to determine which node should store the specified key.

To examine the locality of a given key use either of the following options:

  • Option 1: Confirm that the key is both found in the cache and the DistributionManager indicates it is local, as seen in the following example:

    (cache.getAdvancedCache().withFlags(SKIP_REMOTE_LOOKUP).containsKey(key)
    && cache.getAdvancedCache().getDistributionManager().getLocality(key).isLocal())
  • Option 2: Query the DataContainer directly:

    cache.getAdvancedCache().getDataContainer().containsKey(key)
Note

If the entry is passivated then the DataContainer will return False, regardless of the key’s presence.

34.7.1. Distributed Execution Example

In this example, parallel distributed execution is used to approximate the value of Pi ()

  1. As shown below, the area of a square is:
    Area of a Square (S) = 4r2
  2. The following is an equation for the area of a circle:
    Area of a Circle (C) = π x r2
  3. Isolate r from the first equation:
    r2 = S/4
  4. Inject this value of r into the second equation to find a value for Pi:
    C = Sπ/4
  5. Isolating in the equation results in:
    C = Sπ/4
    4C = Sπ
    4C/S = π

Figure 34.1. Distributed Execution Example

1604

If we now throw a large number of darts into the square, then draw a circle inside the square, and discard all dart throws that landed outside the circle, we can approximate the C/S value.

The value of is previously worked out to 4C/S. We can use this to derive the approximate value of . By maximizing the amount of darts thrown, we can derive an improved approximation of .

In the following example, we throw 10 million darts by parallelizing the dart tossing across the cluster:

Distributed Execution Example

public class PiAppx {

   public static void main (String [] arg){
      List<Cache> caches = ...;
      Cache cache = ...;

      int numPoints = 10000000;
      int numServers = caches.size();
      int numberPerWorker = numPoints / numServers;

      DistributedExecutorService des = new DefaultExecutorService(cache);
      long start = System.currentTimeMillis();
      CircleTest ct = new CircleTest(numberPerWorker);
      List<Future<Integer>> results = des.submitEverywhere(ct);
      int countCircle = 0;
      for (Future<Integer> f : results) {
         countCircle += f.get();
      }
      double appxPi = 4.0 * countCircle / numPoints;

      System.out.println("Distributed PI appx is " + appxPi +
      " completed in " + (System.currentTimeMillis() - start) + " ms");
   }

   private static class CircleTest implements Callable<Integer>, Serializable {

      /** The serialVersionUID */
      private static final long serialVersionUID = 3496135215525904755L;

      private final int loopCount;

      public CircleTest(int loopCount) {
         this.loopCount = loopCount;
      }

      @Override
      public Integer call() throws Exception {
         int insideCircleCount = 0;
         for (int i = 0; i < loopCount; i++) {
            double x = Math.random();
            double y = Math.random();
            if (insideCircle(x, y))
               insideCircleCount++;
         }
         return insideCircleCount;
      }

      private boolean insideCircle(double x, double y) {
         return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
         <= Math.pow(0.5, 2);
      }
   }
}