第35章 Apache Hadoop との統合

35.1. Apache Hadoop との統合

JBoss Data Grid コネクターは、JBoss Data Grid を Hadoop 対応のデータソースにします。この統合は、Hadoop の InputFormat および OutputFormat の実装を提供して実現され、アプリケーションが最適な場所の JBoss Data Grid サーバーに対してデータを読み書きできるようにします。JBoss Data Grid の InputFormat および OutputFormat 実装は、従来の Hadoop Map/Reduce ジョブの実行を可能にしますが、Hadoop の InputFormat データソースをサポートするツールやユーティリティーと使用することもできます。

35.2. Hadoop の依存関係

Hadoop の形式の JBoss Data Grid 実装は以下の Maven 依存関係にあります。

<dependency>
    <groupId>org.infinispan.hadoop</groupId>
    <artifactId>infinispan-hadoop-core</artifactId>
    <version>0.2.2.Final-redhat-1</version>
</dependency>

35.3. サポートされる Hadoop 設定パラメーター

以下のパラメーターがサポートされます。

表35.1 サポートされる Hadoop 設定パラメーター

パラメーター名説明デフォルト値

hadoop.ispn.input.filter.factory

読み取りの前にデータを事前にフィルターするためにサーバーにデプロイされるフィルターファクトリーの名前。

null (有効なフィルタリングなし)

hadoop.ispn.input.cache.name

データが読み取られるキャッシュの名前。

___defaultcache

hadoop.ispn.input.remote.cache.servers

以下の形式を使用する、入力キャッシュのサーバーのリスト。

host1:port;host2:port2

localhost:11222

hadoop.ispn.output.cache.name

データが書き込まれるキャッシュの名前。

default

hadoop.ispn.output.remote.cache.servers

以下の形式を使用する、出力キャッシュのサーバーのリスト。

host1:port;host2:port2

null (出力キャッシュなし)

hadoop.ispn.input.read.batch

キャッシュから読み取りするときのバッチサイズ。

5000

hadoop.ispn.output.write.batch

キャッシュに書き込みするときのバッチサイズ。

500

hadoop.ispn.input.converter

キャッシュから読み取りした後に適用される、org.infinispan.hadoop.KeyValueConverter の実装とのクラス名。

null (有効な変換なし)。

hadoop.ispn.output.converter

書き込みの前に適用される、 org.infinispan.hadoop.KeyValueConverter の実装とのクラス名。

null (有効な変換なし)。

35.4. Hadoop コネクターの使用

InfinispanInputFormat および InfinispanOutputFormat

Hadoop では、InputFormat インターフェースは、特定のデータソースがパーティション化される方法と、各パーティションからのデータの読み取り方法を示します。OutputFormat インターフェースはデータの書き込み方法を指定します。

InputFormat インターフェースには、重要なメソッドが 2 つ定義されています。

  1. getSplits メソッドは、データの特定セクションに関する情報が含まれる 1 つ以上の InputSplit インスタンスを返すデータパーティショナーを定義します。

    List<InputSplit> getSplits(JobContext context);
  2. InputSplit を使用すると、結果となるデータセットで反復処理を行うために使用される RecordReader を取得できます。

    RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context);

これらの 2 つの操作は、複数のノード全体でデータの並列処理を可能にするため、大型のデータセットで Hadoop のスループットが大きくなります。

JBoss Data Grid では、パーティションはセグメントの所有権を基に生成されるため、各パーティションは特定のサーバーのセグメントのセットになります。デフォルトでは、パーティションの数はクラスターのサーバーの数と同じになり、各パーティションにはその特定のサーバーに関連するすべてのセグメントが含まれます。

JBoss Data Grid における Hadoop Map Reduce ジョブの実行

以下に JBoss Data Grid クラスターをターゲットとする Map Reduce ジョブの設定例を示します。

import org.infinispan.hadoop.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;

[...]
Configuration configuration = new Configuration();
configuration.set(InfinispanConfiguration.INPUT_REMOTE_CACHE_SERVER_LIST, "localhost:11222");
configuration.set(InfinispanConfiguration.INPUT_REMOTE_CACHE_NAME, "map-reduce-in");
configuration.set(InfinispanConfiguration.OUTPUT_REMOTE_CACHE_SERVER_LIST, "localhost:11222");
configuration.set(InfinispanConfiguration.OUTPUT_REMOTE_CACHE_NAME, "map-reduce-out");

Job job = Job.getInstance(configuration, "Infinispan Integration");
[...]

JBoss Data Grid をターゲットとするには、ジョブを InfinispanInputFormat および InfinispanOutputFormat クラスと設定する必要があります。

[...]
// Define the Map and Reduce classes
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);

// Define the JBoss Data Grid implementations
job.setInputFormatClass(InfinispanInputFormat.class);
job.setOutputFormatClass(InfinispanOutputFormat.class);
[...]