第39章 ストリーム

39.1. ストリーム

ストリームは Java 8 で導入され、キャッシュ全体などの大変大きなデータセットで操作を効率的に行うことができます。これらの操作は、データセット全体で手順通りに反復操作を行うのではなく、コレクションで実行されます。

さらに、キャッシュが分散されている場合は、クラスター全体で同時に操作を実行できるため、操作の実行がより効率的になります。

Stream を取得するには、単一スレッドのストリームの場合は stream() メソッド、複数スレッドのストリームの場合は parallelStream() メソッドを指定の Map 上で呼び出します。並列ストリームの詳細は 並列処理 で説明します。

39.2. ローカル/インバリデーション/レプリケーションキャッシュでのストリームの使用

ローカル、インバリデーション、またはレプリケーションキャッシュと使用するストリームは、通常のコレクション上のストリームと同じように使用できます。

たとえば複数の Book が含まれるキャッシュで、書名に「JBoss」が含まれるすべてのエントリーが含まれるマップを作成するには、以下を使用できます。

Map<Object, String> jbossBooks = cache.entrySet().stream()
    .filter(e -> e.getValue().getTitle().contains("JBoss"))
    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

39.3. 分散キャッシュでのストリームの使用

ストリーム操作が分散キャッシュで実行されると、中間および終端操作を各ノードに送信し、結果となるデータは元のノードに返信されます。この動作により、操作をリモートノードで実行でき、最終結果のみが返されます。中間値は返されないため、最良のパフォーマンスが実現されます。

リハッシュ対応

ストリームが作成されたら、データはセグメント化され、各ノードはプライマリー所有者として所有するデータでのみ操作を実行します。セグメントの細かさが、ノードごとにデータを均一に分散できる細かさである場合、セグメント全体でデータを均一に処理できます。

データはノード間で再分散されるため、新しいノードが追加されたり、古いノードがクラスターから削除されると、この処理は不安定になります。これにより、データを 2 度処理できる問題が発生することがありますが、分散ストリーム は自動的にデータの再分散を処理するため、ノードを手動で監視する必要はありません。

39.4. タイムアウトの設定

操作リクエストのタイムアウト値を設定することは可能です。これは、リモートリクエストのみで使用され、リクエストごとに設定されます。そのため、ローカルリクエストは常にタイムアウトせず、フェイルオーバーが発生すると後続の各リクエストには新しいタイムアウトが指定されます。

タイムアウトの指定がない場合、デフォルトではレプリケーションタイムアウトが使用されます。以下の例のようにストリームの timeout(long timeout, TimeUnit unit) メソッドを使用すると、手作業で設定できます。

CacheStream<Map.Entry<Object, String>> stream = cache.entrySet().stream();
stream.timeout(1, TimeUnit.MINUTES);

39.5. 分散ストリーム

39.5.1. 分散ストリーム

分散ストリームは、map reduce と同様に動作します。しかし、分散ストリームでは、ゼロから多数の中間操作があり、その後にワークが実行される各ノードへ送信される単一の終端操作が続きます。この動作には以下の手順が使用されます。

  1. どのノードが各指定セグメントのプライマリー所有者であるかによって、セグメントはグループ化されます。
  2. リクエストは各リモートノードに対して生成されます。このリクエストには中間と終端操作が処理するセグメントとともに含まれます。

    • 終端操作が開始されたスレッドは直接ローカル操作を実行します。
    • 各リモートノードは生成されたリクエストを受信し、リモートスレッド上で操作を実行した後、応答を返信します。
  3. すべてのリクエストが完了したら、ユーザースレッドはすべての応答を収集し、操作によって指定された削減を実行します。
  4. 最終応答がユーザーに返されます。

39.5.2. マーシャルの可能性

分散キャッシュまたはレプリケートされたキャッシュを使用する場合、キーと値がマーシャル可能である必要があります。また、分散ストリームで実行される操作はクラスターの他のノードに送信されるため、これらの操作もマーシャル可能である必要があります。これらをマーシャル可能にするには、一般的に Serializable である新しいクラスを使用するか、Externalizer が登録済みの新しいクラスを使用します。しかし、FunctionalInterfaceSerializable を実装し、すべてのラムダは即座にシリアライズされるため、追加のキャストは必要ありません。

注記

分散ストリームの中間値はマーシャル可能である必要はありません。返信される最終値 (通常は終端操作) はマーシャル可能でなければなりません。

ラムダ関数が使用されている場合、パラメーターを Serializable のインスタンスとしてキャストするとシリアライズできます。たとえば、Book エントリーを格納するキャッシュの場合、以下は特定の著者と一致する Book インスタンスのコレクションを作成します。

List<Book> books = cache.keySet().stream()
    .filter(e -> e.getAuthor().equals("authorname"))
    .collect(toList());

さらに、デフォルトでは作成されたすべての Collectors がマーシャル可能であるとは限りません。JBoss Data Grid には、マーシャリングが必要なときに適切に機能する Collectors の組み合わせを容易に使用できる org.infinispan.stream.CacheCollectors が含まれています。

39.5.3. 並列処理

ストリームを並列処理するメソッドは 2 つあります。

  • 並列ストリーム (Parallel Streams) - 単一ノードで各操作が並列して実行されます。
  • 並列分散 (Parallel Distribution) - 複数のノードが関係するようにリクエストを並列処理します。

デフォルトでは、分散ストリームは並列分散を有効にしますが、さらに並列 Stream に結合できます。これにより、各ノードで複数のスレッドを使用して、複数のノード全体で同時操作を実行できます。

Stream を並列としてマーク付けするには、parallelStream() で取得するか、parallel() を呼び出して Stream の取得後に有効にします。以下の例は両方の方法を示しています。

// Obtain a parallel Stream initially
List<Book> books = cache.keySet().parallelStream()
    [...]

// Create the initial stream and then invoke parallel
List<Book> books = cache.keySet().stream()
    .parallel()
    [...]
注記

リハッシュ対応イテレーターや forEach 操作などの一部の操作では、順次ストリームがローカルで強制されるものがあります。現在、このような操作で並列ストリームを使用することはできません。

39.5.4. 分散演算子

39.5.4.1. 終端演算子の分散結果削減

以下で各終端演算子について説明し、さらに終端演算子に対して分散削減がどのように動作するかも説明します。

  • allMatch

    この演算子は各ノードで実行され、すべての結果はローカルの論理 AND 操作を使用して組み合わされ、最終値が取得されます。通常のストリーム操作が早く返された場合、これらのメソッドも早期に完了します。

  • noneMatch anyMatch

    これらの演算子は各ノードで実行され、すべての結果は論理 OR 操作を使用して組み合わされ、最終値が取得されます。通常のストリーム操作が早く返された場合、これらのメソッドも早期に完了します。

  • collect

    collect メソッドはいくつかの追加ステップを実行できます。他のメソッドと同様に、リモートノードはすべて想定どおりに実行します。しかし、最後の finisher 演算子を実行する代わりに、完全に組み合わされた結果を返信します。ローカルスレッドはすべてのローカルおよびリモートの結果を組み合わせて値にし、finisher 演算子を実行します。さらに、最終値はシリアライズ可能である必要はありませんが、supplier および combiner メソッドによって作成された値はシリアライズする必要があります。

  • count

    count メソッドは各キャッシュから受信した数字を加算します。

  • findAny findFirst

    findAny メソッドは、値がリモートまたはローカルノードからであるかに関わらず、最初に見つかった値を返します。この操作は早期終了をサポートし、最初の値が見つかったら他の値は処理されません。findFirst メソッドの動作と似ていますが、findFirst メソッドには「中間操作の例外」に説明のあるソートされた中間操作が必要になります。

  • max min

    max および min メソッドは、最終削減の実行前に各ノードでそれぞれの値を見つけ、すべてのノード全体の最大値または最小値を判断します。

  • reduce

    さまざまな reduce メソッドは、ローカルでローカルの結果とリモートの結果を累積する (有効な場合は組み合わせを行う) 前に、できるだけ多く結果をシリアライズします。この動作により、combiner から返された値はシリアライズ可能である必要はありません。

39.5.4.2. キーベースのリハッシュ対応演算子

以下の演算子は、他の終端演算子とは異なり、セグメントごとに処理されたキーを追跡するために特別なリハッシュ対応を必要とします。これにより、クラスターのメンバーシップが変更になっても、iterator および spliterator 演算子では各キーが 1 度のみ処理されることが保証され、forEach 演算子では最低でも 1 度処理されることが保証されます。

  • iterator spliterator

    これらの演算子はリモートノードで実行されると、エントリーのバッチを返し、以前のバッチが完全に消費された後でのみ次のバッチが送信されます。この動作は、1 度にメモリーに保持されるエントリーの数を制限します。ユーザーノードは、どのキーが処理されたかを追跡し、セグメントが完了するとこれらのキーはメモリーから解放されます。この動作により、順次処理の使用が推奨され、すべてのノードからのキーを保持する代わりにセグメントキーのサブセットのみをメモリーに保持するようにします。

  • forEach

    forEach はバッチを返しますが、最低でもバッチに値するキーを処理した後でのみバッチを返します。これにより、元のノードは処理済みのキーを認識でき、同じエントリーを再処理する可能性を低減します。しかし、ノードが予期せずダウンした場合に同じセットが繰り返し処理される可能性があります。この場合、ノードがダウンしたときに未完了のバッチの処理中であった可能性があり、リハッシュ障害の操作が発生するときに同じバッチが再度実行される可能性があります。リハッシュのフェイルオーバーはすべての応答が受信されるまで発生しないため、ノードを追加してもこの問題の原因にはなりません。

操作のバッチサイズは、CacheStreamdistributedBatchSize によって制御されます。値が設定されていない場合は、状態の遷移で設定された chunkSize がデフォルトとして使用されます。値が大きいと大きなバッチが許可されますが、返信回数が少なくなり、メモリーの使用量が増加します。テストを実行して各アプリケーションに適切なサイズを判断してください。

39.5.4.3. 中間操作の例外

以下の中間操作には特別な例外があります。これらすべてのメソッドのストリーム処理には、適切な処理を保証する人工的なイテレーターのようなものが埋め込まれています。そのため、以下の中間操作を使用すると、パフォーマンスが大幅に劣化する可能性があります。

  • Skip

    人工的なイテレーターは skip 操作まで埋め込まれ、結果はローカルに取り込まれるため、適切な要素の数がスキップされる可能性があります。

  • Peek

    人工的なイテレーターは peek 操作まで埋め込まれます。特定数の peek 処理された要素のみがリモートノードに返され、結果はローカルに取り込まれるため、指定の数のみに peek が行われる可能性があります。

  • Sorted

    人工的なイテレーターは sorted 操作まで埋め込まれ、すべての結果はローカルでソートされます。

    警告

    この操作では、すべてのエントリーをローカルノードのメモリーに格納する必要があります。

  • Distinct

    Distinct は各リモートノードで実行され、すべての結果に distinct 操作が実行された後に人工的なイテレーターは distinct の値を返します。

    警告

    この操作では、すべてのエントリーをローカルノードのメモリーに格納する必要があります。

39.5.5. 分散ストリームの例

典型的な Map/Reduce の例が単語数のカウントです。キーと値に対して String を持つキャッシュがあり、文すべての単語数を数える必要がある場合、以下を使用して実装できます。

Map<String, Long> wordCountMap = cache.entrySet().parallelStream()
    .map(e -> e.getValue().split("\\s"))
    .flatMap(Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));

例を変更して最も頻繁に使用される単語を見つける場合、すべての単語が必要になり、最初にローカルで数える必要があります。以下のスニペットは前述の例を拡張し、この検索を実行します。

String mostFrequentWord = cache.entrySet().parallelStream()
    .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.collectingAndThen(
        Collectors.groupingBy(Function.identity(), Collectors.counting()),
            wordCountMap -> {
                String mostFrequent = null;
                long maxCount = 0;
                for (Map.Entry<String, Long> e : wordCountMap.entrySet()) {
                    int count = e.getValue().intValue();
                    if (count > maxCount) {
                        maxCount = count;
                        mostFrequent = e.getKey();
                    }
                }
                return mostFrequent;
            })));

この時点で、最後のステップが単一のスレッドで実行されます。ローカルで並列ストリームを使用して最終の操作を実行すると、この操作をさらに最適化できます。

Map<String, Long> wordCount = cache.entrySet().parallelStream()
    .map((Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s"))
    .flatMap((Function<String[], Stream<String>>) Arrays::stream)
    .collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));
Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream()
    .reduce((e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);