第34章 分散実行

34.1. 分散実行

Red Hat JBoss Data Grid は、標準の JDK ExecutorService インターフェースより分散実行を提供します。実行のために提出されたタスクは、ローカル JVM ではなく、JBoss Data Grid ノードのクラスター全体で実行されます。

JBoss Data Grid の分散タスクエクゼキューターは、JBoss Data Grid キャッシュノードからのデータを実行タスクの入力として使用できます。そのため、中間または最終結果のキャッシュストアを設定する必要がありません。JBoss Data Grid の入力データはすでに負荷分散されているため、タスクも自動的に分散されます。そのため、明示的にタスクを特定のノードに割り当てる必要はありません。

JBoss Data Grid の分散実行フレームワークでは、以下が行われます。

  • DistributedExecutorService は単一のキャッシュにバインドされます。提出されたタスクが DistributedCallable のインスタンスである場合、そのタスクは特定のキャッシュからキーバリューペアにアクセスできます。
  • タスクの 1 つが実行されるときにタスクが他のノードに移行されないようにするため、提出された各 CallableRunnableDistributedCallable は、Serializable または Externalizable である必要があります。Callable から返された値も Serializable または Externalizable である必要があります。

34.2. 分散エクゼキューターサービス

DistributedExecutorServiceDistributedCallable と、クラスターの他の Callable および Runnable クラスの実行を制御します。これらのインスタンスは、インストール時に渡される特定キャッシュに関係します。

DistributedExecutorService des = new DefaultExecutorService(cache);

DistributedCallableAPI の説明にあるとおり、DistributedCallable が拡張されると、DistributedTask はキーのサブセットに対してのみ実行できます。タスクがこのように単一ノードへ提出されると、JBoss Data Gridは指定のキーが含まれるノードを見つけます。さらに、DistributedCallable をこのノードに移行し、CompletableFuture を返します。タスクがこのように利用可能なノードすべてに提出された場合、指定のキーが含まれるノードのみがタスクを受け取ります。

DistributedTask が作成されたら、以下のメソッドの 1 つを使ってクラスターに提出することができます。

  • submitEverywhere メソッドを使用すると、クラスターの利用可能なノードすべてとキーバリューペアすべてにタスクを提出できます。

    des.submitEverywhere(task)
  • submitEverywhere メソッドは、キーのセットを引数として取ることもできます。このようにキーを渡すと、指定のキーが含まれる利用可能なノードのみにタスクが提出されます。

    des.submitEverywhere(task, $KEY)
  • キーが指定された場合、タスクは指定されたキーが 1 つ以上含まれる単一のノードで実行されます。ローカルにないキーは、クラスターから取得されます。このバージョンの submit メソッドは、以下の例のように操作する 1 つ以上のキーを受け入れます。

    des.submit(task, $KEY)
    des.submit(task, $KEY1, $KEY2, $KEY3)
  • ノードの Addresssubmit メソッドに渡すと、特定ノードの実行を指示できます。以下は、クラスターの Coordinator でのみ実行されます。

    des.submit(cache.getCacheManager().getCoordinator(), task)
    注記

    デフォルトではタスクは自動的に分散されるため、通常は、実行の対象となる特定のノードを指定する必要はありません。

34.3. DistributedCallable API

DistributedCallable インターフェースは、java.util.concurrent.package からの既存の Callable のサブタイプです。リモート JVM で実行でき、Red Hat JBoss Data Grid からの入力を受け取ることができます。DistributedCallable インターフェースは、JBoss Data Grid のキャッシュデータにアクセスする必要があるタスクのために使用されます。

DistributedCallable API を使用してタスクを実行する場合、タスクのメインアルゴリズムに変更はありませんが、入力ソースは変更になります。

キャッシュへのアクセスや渡したキーセットへのアクセスが必要な場合、Callable インターフェースをすでに実装済みのユーザーは DistributedCallable を拡張する必要があります。

DistributedCallable API の使用

public interface DistributedCallable<K, V, T> extends Callable<T> {

   /**
    * Invoked by execution environment after DistributedCallable
    * has been migrated for execution to a specific Infinispan node.
    *
    * @param cache
    *           cache whose keys are used as input data for this
    *           DistributedCallable task
    * @param inputKeys
    *           keys used as input for this DistributedCallable task
    */
   public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys);

}

34.4. Callable および CDI

DistributedCallable を実装できない場合または DistributedCallable が適切でない場合に DistributedExecutorService で使用される入力キャッシュへの参照が必要であれば、CDI メカニズムによって入力キャッシュをインジェクトするオプションを使用できます。

Callable タスクが Red Hat JBoss Data Grid の実行ノードに到達すると、JBoss Data Grid の CDI メカニズムは適切なキャッシュ参照を提供し、実行している Callable にインジェクトします。

Callable を用いて JBoss Data Grid CDI を使用するには、以下の手順に従います。

  1. CallableCache フィールドを宣言し、org.infinispan.cdi.Input でアノテーションを付けます。
  2. 必須の @Inject アノテーションを含めます。

Callable および CDI の使用

public class CallableWithInjectedCache implements Callable<Integer>, Serializable {

      @Inject
      @Input
      private Cache<String, String> cache;


      @Override
      public Integer call() throws Exception {
        //use injected cache reference
        return 1;
      }
}

34.5. 分散タスクのフェイルオーバー

Red Hat JBoss Data Grid の分散実行フレームワークは、以下の場合でタスクのフェイルオーバーをサポートします。

  • タスクが実行されている場所でノードの障害によるフェイルオーバーが発生した場合。
  • タスクの障害によるフェイルオーバー。たとえば、 Callable タスクによって例外が発生した場合。

フェイルオーバーポリシーはデフォルトで無効になっており、RunnableCallable、および DistributedCallable タスクはフェイルオーバーメカニズムを呼び出しせずに失敗します。

JBoss Data Grid は、ランダムノードフェイルオーバーポリシーを提供し、利用可能な場合に別のランダムなノードで Distributed タスクの一部を実行します。

たとえば、以下を使用してランダムフェイルオーバー実行ポリシーを指定できます。

ランダムフェイルオーバー実行ポリシー

DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable());
taskBuilder.failoverPolicy(DefaultExecutorService.RANDOM_NODE_FAILOVER);
DistributedTask<Boolean> distributedTask = taskBuilder.build();
Future<Boolean> future = des.submit(distributedTask);
Boolean r = future.get();

DistributedTaskFailoverPolicy インターフェースを実行してフェイルオーバー管理を提供することもできます。

分散タスクのフェイルオーバーポリシーインターフェース

/**
 * DistributedTaskFailoverPolicy allows pluggable fail over target selection for a failed remotely
 * executed distributed task.
 *
 */
public interface DistributedTaskFailoverPolicy {

   /**
    * As parts of distributively executed task can fail due to the task itself throwing an exception
    * or it can be an Infinispan system caused failure (e.g node failed or left cluster during task
    * execution etc).
    *
    * @param failoverContext
    *           the FailoverContext of the failed execution
    * @return result the Address of the Infinispan node selected for fail over execution
    */
   Address failover(FailoverContext context);

   /**
    * Maximum number of fail over attempts permitted by this DistributedTaskFailoverPolicy
    *
    * @return max number of fail over attempts
    */
   int maxFailoverAttempts();
}

34.6. 分散タスク実行ポリシー

DistributedTaskExecutionPolicy は、ノードのサブセットへのタスクの実行をスコープ指定することで、タスクが Red Hat JBoss Data Grid クラスター全体でカスタム実行ポリシーを指定できるようにします。

たとえば、DistributedTaskExecutionPolicy を使用すると、以下の場合でタスクの実行を管理できます。

  • バックアップリモートネットワークセンターではなく、ローカルネットワークの場所でのみタスクが実行される場合。
  • タスクの実行に特定の JBoss Data Grid ラックノードの専用サブセットのみが必要な場合。

ラックノードを使用した特定タスクの実行

DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable());
taskBuilder.executionPolicy(DistributedTaskExecutionPolicy.SAME_RACK);
DistributedTask<Boolean> distributedTask = taskBuilder.build();
Future<Boolean> future = des.submit(distributedTask);
Boolean r = future.get();

34.7. 分散実行とローカリティー

分散環境の所有権では、DistributionManagerConsistentHash は理論的で、これらのクラスはデータがキャッシュでアクティブであるかを認識しません。代わりに、これらのクラスは指定のキーを格納するノードを判断するために使用されます。

指定のキーのローカリティーを確認するには、以下のオプションの 1 つを使用します。

  • オプション 1: 以下の例のように、キーがキャッシュにあり、そのキーがローカルであることを DistributionManager が示すことを確認します。

    (cache.getAdvancedCache().withFlags(SKIP_REMOTE_LOOKUP).containsKey(key)
    && cache.getAdvancedCache().getDistributionManager().getLocality(key).isLocal())
  • オプション 2: 直接 DataContainer をクエリーします。

    cache.getAdvancedCache().getDataContainer().containsKey(key)
注記

エントリーがパッシベートされた場合、キーの存在に関わらず DataContainerFalse を返します。

34.7.1. 分散実行の例

この例では、Pi () の近似値の算出に並列分散実行が使用されます。

  1. 以下に示された正方形の面積:
  2. 円の面積の方程式:
  3. 最初の式から r を分離:
  4. r のこの値を 2 番目の式に挿入し、Pi の値を算出:
  5. 式の結果を分離:

図34.1 分散実行の例

1604

正方形に大量のダーツを投げ、その正方形の中に円を描き、円の内部に突き刺さったすべてのダーツを破棄すると、円/正方形の近似値を算出できます。

以前、Pi の値は 4C/S と算出されました。この値を使用して Pi の近似値を求めることができます。投げるダーツの数を最大限にすると近似値がより正確になります。

以下の例では、クラスター全体で並列処理して 1 千万本のダーツを投げます。

分散実行の例

public class PiAppx {

   public static void main (String [] arg){
      List<Cache> caches = ...;
      Cache cache = ...;

      int numPoints = 10000000;
      int numServers = caches.size();
      int numberPerWorker = numPoints / numServers;

      DistributedExecutorService des = new DefaultExecutorService(cache);
      long start = System.currentTimeMillis();
      CircleTest ct = new CircleTest(numberPerWorker);
      List<Future<Integer>> results = des.submitEverywhere(ct);
      int countCircle = 0;
      for (Future<Integer> f : results) {
         countCircle += f.get();
      }
      double appxPi = 4.0 * countCircle / numPoints;

      System.out.println("Distributed PI appx is " + appxPi +
      " completed in " + (System.currentTimeMillis() - start) + " ms");
   }

   private static class CircleTest implements Callable<Integer>, Serializable {

      /** The serialVersionUID */
      private static final long serialVersionUID = 3496135215525904755L;

      private final int loopCount;

      public CircleTest(int loopCount) {
         this.loopCount = loopCount;
      }

      @Override
      public Integer call() throws Exception {
         int insideCircleCount = 0;
         for (int i = 0; i < loopCount; i++) {
            double x = Math.random();
            double y = Math.random();
            if (insideCircle(x, y))
               insideCircleCount++;
         }
         return insideCircleCount;
      }

      private boolean insideCircle(double x, double y) {
         return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
         <= Math.pow(0.5, 2);
      }
   }
}