Data Grid 開発者ガイド
Data Grid のカスタマイズ、設定、拡張
概要
Red Hat Data Grid
Data Grid は、高性能の分散型インメモリーデータストアです。
- スキーマレスデータ構造
- さまざまなオブジェクトをキーと値のペアとして格納する柔軟性があります。
- グリッドベースのデータストレージ
- クラスター間でデータを分散および複製するように設計されています。
- エラスティックスケーリング
- サービスを中断することなく、ノードの数を動的に調整して要件を満たします。
- データの相互運用性
- さまざまなエンドポイントからグリッド内のデータを保存、取得、およびクエリーします。
Data Grid のドキュメント
Data Grid のドキュメントは、Red Hat カスタマーポータルで入手できます。
Data Grid のダウンロード
Red Hat カスタマーポータルで Data Grid Software Downloads にアクセスします。
Data Grid ソフトウェアにアクセスしてダウンロードするには、Red Hat アカウントが必要です。
多様性を受け入れるオープンソースの強化
Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。この取り組みは膨大な作業を要するため、今後の複数のリリースで段階的に用語の置き換えを実施して参ります。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。
第1章 Data Grid Maven リポジトリーの設定
Data Grid Java ディストリビューションは Maven から入手できます。
顧客ポータルから Data Grid Maven リポジトリーをダウンロードするか、パブリック Red Hat Enterprise Maven リポジトリーから Data Grid 依存関係をプルできます。
1.1. Data Grid Maven リポジトリーのダウンロード
パブリック Red Hat Enterprise Maven リポジトリーを使用しない場合は、ローカルファイルシステム、Apache HTTP サーバー、または Maven リポジトリーマネージャーに Data Grid Maven リポジトリーをダウンロードし、インストールします。
手順
- Red Hat カスタマーポータルにログインします。
- Software Downloads for Data Grid に移動します。
- Red Hat Data Grid 8.2 Maven リポジトリーをダウンロードします。
- アーカイブされた Maven リポジトリーをローカルファイルシステムに展開します。
-
README.md
ファイルを開き、適切なインストール手順に従います。
1.2. Red Hat Maven リポジトリーの追加
Red Hat GA リポジトリーを Maven ビルド環境に組み込み、Data Grid アーティファクトおよび依存関係を取得します。
手順
Red Hat GA リポジトリーを Maven 設定ファイル (通常は
~/.m2/settings.xml
) に追加するか、プロジェクトのpom.xml
ファイルに直接追加します。<repositories> <repository> <id>redhat-ga-repository</id> <name>Red Hat GA Repository</name> <url>https://maven.repository.redhat.com/ga/</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>redhat-ga-repository</id> <name>Red Hat GA Repository</name> <url>https://maven.repository.redhat.com/ga/</url> </pluginRepository> </pluginRepositories>
1.3. Data Grid POM の設定
Maven は、プロジェクトオブジェクトモデル (POM) ファイルと呼ばれる設定ファイルを使用して、プロジェクトを定義し、ビルドを管理します。POM ファイルは XML 形式であり、モジュールとコンポーネントの依存関係、ビルドの順序、および結果となるプロジェクトのパッケージ化と出力のターゲットを記述します。
手順
-
プロジェクト
pom.xml
を開いて編集します。 -
正しい Data Grid バージョンで
version.infinispan
プロパティーを定義します。 dependencyManagement
セクションにinfinispan-bom
を含めます。BOM(Bill of Materials) は、依存関係バージョンを制御します。これにより、バージョンの競合が回避され、プロジェクトに依存関係として追加する Data Grid アーティファクトごとにバージョンを設定する必要がなくなります。
-
pom.xml
を保存して閉じます。
以下の例は、Data Grid のバージョンと BOM を示しています。
<properties> <version.infinispan>12.1.11.Final-redhat-00001</version.infinispan> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-bom</artifactId> <version>${version.infinispan}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
次のステップ
必要に応じて、Data Grid アーティファクトを依存関係として pom.xml
に追加します。
第2章 キャッシュマネージャー
Data Grid への主なエントリーポイントは CacheManager
インターフェイスであり、以下を行うことができます。
- キャッシュを設定および取得します。
- クラスター化された Data Grid ノードを管理および監視します。
- クラスター全体でコードを実行します。
Data Grid をアプリケーションに埋め込む場合は、EmbeddedCacheManager
を使用します。Data Grid をリモートサーバーとして実行する場合は、RemoteCacheManager
を使用します。
キャッシュマネージャーはヘビーウェイトオブジェクトであるため、ほとんどの場合、JVM ごとに 1 つの CacheManager
インスタンスのみをインスタンス化する必要があります。
EmbeddedCacheManager manager = new DefaultCacheManager(); 1
- 1
- キャッシュのないローカルのクラスター化されていないキャッシュマネージャーを起動します。
キャッシュマネージャーにはライフサイクルがあり、デフォルトのコンストラクターは start()
メソッドも呼び出します。コンストラクターのオーバーロードされたバージョンが利用可能ですが、それらは CacheManager
を開始しません。ただし、キャッシュを作成する前に必ず CacheManager
を起動する必要があります。
同様に、実行中の CacheManager
が不要になった際に stop()
を呼び出して、リソースを解放する必要があります。これにより、キャッシュマネージャーは制御するキャッシュを安全に停止することもできます。
2.1. キャッシュの取得
CacheManager
を設定した後、キャッシュを取得および制御できます。
以下のように、キャッシュを取得するために getCache(String)
メソッドを呼び出します。
Cache<String, String> myCache = manager.getCache("myCache");
上記の操作は、myCache
という名前のキャッシュがまだ存在しない場合は作成し、それを返します。
getCache()
メソッドを使用すると、メソッドを呼び出すノードにのみキャッシュが作成されます。つまり、クラスター全体の各ノードで呼び出す必要のあるローカル操作を実行します。通常、複数のノードにまたがってデプロイされたアプリケーションは、初期化中にキャッシュを取得して、キャッシュが対称であり、各ノードに存在することを確認します。
createCache()
メソッドを呼び出して、以下のようにクラスター全体でキャッシュを動的に作成します。
Cache<String, String> myCache = manager.administration().createCache("myCache", "myTemplate");
上記の操作では、後でクラスターに参加するすべてのノードにキャッシュが自動的に作成されます。
createCache()
メソッドを使用して作成するキャッシュは、デフォルトでは一時的です。クラスター全体がシャットダウンした場合、再起動時にキャッシュが自動的に再作成されることはありません。
PERMANENT フラグを使用して、以下のようにキャッシュが再起動後も存続できるようにします。
Cache<String, String> myCache = manager.administration().withFlags(AdminFlag.PERMANENT).createCache("myCache", "myTemplate");
PERMANENT フラグを有効にするには、グローバルの状態を有効にし、設定ストレージプロバイダーを設定する必要があります。
設定ストレージプロバイダーの詳細は、GlobalStateConfigurationBuilder#configurationStorage() を参照してください。
2.2. クラスターリング情報
EmbeddedCacheManager
には、クラスターの動作に関する情報を提供するためのメソッドが多数あります。以下のメソッドは、クラスター環境で使用される場合 (Transport が設定されている場合) にのみ意味があります。
2.3. メンバー情報
クラスターを使用している場合、クラスターの所有者が誰であるかなど、クラスターのメンバーシップに関する情報を見つけられることが非常に重要となります。
getMembers() メソッドは、現在のクラスター内のすべてのノードを返します。
getCoordinator() メソッドは、どのメンバーがクラスターのコーディネーターであるかを指示します。ほとんどの場合、コーディネーターが誰であるかを気にする必要はありません。isCoordinator() メソッドを直接使用して、ローカルノードがコーディネーターであるかどうかを確認することもできます。
第3章 キャッシュインターフェイス
Data Grid は、JDK の ConcurrentMap インターフェイスによって公開されるアトミックメカニズムを含む、エントリーを追加、取得、および削除するための簡単なメソッドを公開する Cache インターフェイスを提供します。使用されるキャッシュモードに基づいて、これらのメソッドを呼び出すと、リモートノードにエントリーを複製したり、リモートノードからエントリーを検索することやキャッシュストアからエントリーを検索することなど、数多くのことが発生します。
3.1. キャッシュ API
単純な使用の場合、Cache API の使用は JDK Map API の使用と違いがないはずです。したがって、マップに基づく単純なインメモリーキャッシュから Data Grid のキャッシュへの移行は簡単になります。
3.1.1. 特定のマップメソッドのパフォーマンスに関する懸念
size()、values()、keySet()、および entrySet() など、マップで公開される特定のメソッドは、Data Grid と使用すると特定のパフォーマンスに影響します。keySet
の特定のメソッドである values
および entrySet
を使用できます。詳細については、Javadoc を参照してください。
これらの操作をグローバルに実行しようとすると、パフォーマンスに大きな影響を及ぼし、スケーラビリティーのボトルネックにもなります。そのため、これらの方法は情報またはデバッグの目的でのみ使用してください。
withFlags() メソッドで特定のフラグを使用すると、これらの問題の一部を軽減できます。詳細は、各メソッドのドキュメントを参照してください
3.1.2. Mortal および Immortal データ
単にエントリーを格納するだけでなく、Data Grid のキャッシュ API を使用すると、期限付き情報をデータに添付できます。たとえば、単に put(key, value) を使用すると、immortal エントリーが作成されます。このエントリーは削除されるまで (またはメモリー不足にならないようにメモリーからエビクトされるまで)、いつまでもキャッシュに存在します。ただし、put(key, value, lifespan, timeunit) を使用してキャッシュにデータを配置すると、mortalエントリーが作成されます。これは固定のライフスパンのあるエントリーで、そのライフスパンの後に期限切れになります。
Data Grid は、lifespanの他に、有効期限を決定する追加のメトリクスとしてmaxIdleもサポートします。lifespans または maxIdles の任意の組み合わせを使用できます。
3.1.3. putForExternalRead 操作
Data Grid の Cache クラスには、putForExternalRead と呼ばれる異なる'put'操作が含まれます。この操作は、他の場所で保持されるデータの一時キャッシュとして Data Grid が使用される場合に特に便利です。読み取りが非常に多い場合、キャッシュは単に最適化のために行われ、妨害するものではないため、キャッシュの競合によって実際のトランザクションが遅延してはなりません。
これを実現するため、キーがキャッシュ内に存在しない場合にのみ動作する put 呼び出しとしてputForExternalRead()
が動作し、別のスレッドが同じキーを同時に格納しようとすると、通知なしに即座に失敗します。このシナリオでは、データのキャッシュはシステムを最適化する方法で、キャッシングの失敗が実行中のトランザクションに影響するのは望ましくないため、失敗の処理方法が異なります。成功したかどうかに関わらず、ロックを待たず、読み出し元に即座に返されるため、putForExternalRead()
は高速な操作とみなされます。
この操作の使用方法を理解するために、基本的な例を見てみましょう。PersonId によって入力される Person インスタンスのキャッシュを想像してください。このデータは個別のデータストアから入力されます。以下のコードは、この例のコンテキスト内で putForExternalRead を使用する最も一般的なパターンを示しています。
// Id of the person to look up, provided by the application PersonId id = ...; // Get a reference to the cache where person instances will be stored Cache<PersonId, Person> cache = ...; // First, check whether the cache contains the person instance // associated with with the given id Person cachedPerson = cache.get(id); if (cachedPerson == null) { // The person is not cached yet, so query the data store with the id Person person = dataStore.lookup(id); // Cache the person along with the id so that future requests can // retrieve it from memory rather than going to the data store cache.putForExternalRead(id, person); } else { // The person was found in the cache, so return it to the application return cachedPerson; }
putForExternalRead はアプリケーションの実行元となる新しい Person インスタンスでキャッシュを更新するメカニズムとして使用しないでください (Person のアドレスを変更するトランザクションからなど)。キャッシュされた値を更新する場合は、標準の put 操作を使用してください。使用しないと、破損したデータをキャッシュする可能性が高くなります。
3.2. AdvancedCache API
簡単なキャッシュインターフェイスの他に、Data Grid はエクステンション作成者向けに AdvancedCache インターフェイスを提供します。AdvancedCache は、特定の内部コンポーネントにアクセスし、フラグを適用して特定のキャッシュメソッドのデフォルト動作を変更する機能を提供します。次のコードスニペットは、AdvancedCache を取得する方法を示しています。
AdvancedCache advancedCache = cache.getAdvancedCache();
3.2.1. フラグ
フラグは通常のキャッシュメソッドに適用され、特定のメソッドの動作を変更します。利用可能なフラグの一覧と、その効果については、Flag 列挙を参照してください。フラグは、AdvancedCache.withFlags() を使用して適用されます。このビルダーメソッドを使用して、キャッシュ呼び出しに任意の数のフラグを適用できます。次に例を示します。
advancedCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING) .withFlags(Flag.FORCE_SYNCHRONOUS) .put("hello", "world");
3.3. リスナーおよび通知
Data Grid はリスナー API を提供し、クライアントはイベントが発生したときに登録して通知を受け取ることができます。このアノテーション駆動型 API は、キャッシュレベルイベントとキャッシュマネージャーレベルイベントの 2 つの異なるレベルに適用されます。
イベントは、リスナーにディスパッチされる通知をトリガーします。リスナーは @Listener アノテーションが付けられ、Listenable インターフェイスで定義されたメソッドを使用して登録された単純な POJO です。
Cache と CacheManager はどちらも Listenable を実装しています。つまり、リスナーをキャッシュまたはキャッシュマネージャーのいずれかにアタッチして、キャッシュレベルまたはキャッシュマネージャーレベルのいずれかの通知を受信できます。
たとえば、次のクラスは、新しいエントリーがキャッシュに追加されるたびに、ブロックしない方法で、一部の情報を出力するようにリスナーを定義します。
@Listener public class PrintWhenAdded { Queue<CacheEntryCreatedEvent> events = new ConcurrentLinkedQueue<>(); @CacheEntryCreated public CompletionStage<Void> print(CacheEntryCreatedEvent event) { events.add(event); return null; } }
より包括的な例は Javadocs for @Listener を参照してください。
3.3.1. キャッシュレベルの通知
キャッシュレベルのイベントはキャッシュごとに発生し、デフォルトでは、イベントが発生したノードでのみ発生します。分散キャッシュでは、これらのイベントは影響を受けるデータの所有者に対してのみ発生することに注意してください。キャッシュレベルのイベントの例としては、エントリーの追加、削除、変更などがあります。これらのイベントは、特定のキャッシュに登録されているリスナーへの通知をトリガーします。
すべてのキャッシュレベルの通知とそれぞれのメソッドレベルのアノテーションの包括的なリストについては、Javadocs on the org.infinispan.notifications.cachelistener.annotation package を参照してください。
Data Grid で使用可能なキャッシュレベルの通知のリストについては Javadocs on the org.infinispan.notifications.cachelistener.annotation package を参照してください。
3.3.1.1. クラスターリスナー
単一ノードでキャッシュイベントをリッスンすることが望ましい場合は、クラスターリスナーを使用する必要があります。
そのために必要なのは、リスナーがクラスター化されているというアノテーションを付けるよう設定することだけです。
@Listener (clustered = true) public class MyClusterListener { .... }
クラスター化されていないリスナーからのクラスターリスナーには、いくつかの制限があります。
-
クラスターリスナーは、
@CacheEntryModified
、@CacheEntryCreated
、@CacheEntryRemoved
、および@CacheEntryExpired
イベントのみをリッスンできます。これは、他のタイプのイベントは、このリスナーに対してリッスンされないことを意味することに注意してください。 - ポストイベントのみがクラスターリスナーに送信され、プレイベントは無視されます。
3.3.1.2. イベントのフィルタリングおよび変換
リスナーがインストールされているノードで適用可能なすべてのイベントがリスナーに発生します。KeyFilter(キーのフィルタリングのみを許可) または CacheEventFilter(キー、古い値、古いメタデータ、新しい値、新しいメタデータ、コマンドの再実行の有無、イベントがイベント (isPre など) の前であるか、およびコマンドタイプのフィルターに使用) を使用して、どのイベントが発生したかを動的にフィルターできます。
この例で、イベントがキー Only Me
のエントリーを変更したときにイベントのみを発生させる単純な KeyFilter
を示しています。
public class SpecificKeyFilter implements KeyFilter<String> { private final String keyToAccept; public SpecificKeyFilter(String keyToAccept) { if (keyToAccept == null) { throw new NullPointerException(); } this.keyToAccept = keyToAccept; } public boolean accept(String key) { return keyToAccept.equals(key); } } ... cache.addListener(listener, new SpecificKeyFilter("Only Me")); ...
これは、より効率的な方法で受信するイベントを制限したい場合に便利です。
また、イベントが発生する前に値を別の値に変換できるようにする CacheEventConverter もあります。これは、値の変換を行うコードをモジュール化するのに適しています。
上記のフィルターとコンバーターは、クラスターリスナーと組み合わせて使用すると特に効果的です。これは、イベントがリッスンされているノードではなく、イベントが発生したノードでフィルタリングと変換が行われるためです。これにより、クラスター全体でイベントを複製する必要がない (フィルター)、またはペイロードを減らす (コンバーター) という利点があります。
3.3.1.3. 初期状態のイベント
リスナーがインストールされると、完全にインストールされた後にのみイベントが通知されます。
リスナーの初回登録時にキャッシュコンテンツの現在の状態を取得することが望ましい場合があります。この場合、キャッシュ内の各要素の @CacheEntryCreated
タイプのイベントが生成されます。この最初のフェーズで追加で生成されたイベントは、適切なイベントが発生するまでキューに置かれます。
現時点では、これはクラスター化されたリスナーに対してのみ機能します。ISPN-4608 では、クラスター化されていないリスナーへの追加を説明しています。
3.3.1.4. 重複イベント
トランザクションではないキャッシュで、重複したイベントを受け取ることが可能です。これは、put などの書き込み操作の実行中に、キーのプライマリー所有者がダウンした場合に可能になります。
Data Grid は、指定のキーの新規プライマリー所有者に put 操作を自動的に送信することで、put 操作を内部で修正しますが、最初に書き込みがバックアップに複製されたかどうかについては保証はありません。そのため、CacheEntryCreatedEvent
、CacheEntryModifiedEvent
、および CacheEntryRemovedEvent
の書き込みイベントの 1 つ以上が、1 つの操作で送信される可能性があります。
複数のイベントが生成された場合、Data Grid は再試行コマンドによって生成されたイベントをマークし、変更の表示に注意を払いなくても、このイベントが発生したタイミングを把握できるようにします。
@Listener public class MyRetryListener { @CacheEntryModified public void entryModified(CacheEntryModifiedEvent event) { if (event.isCommandRetried()) { // Do something } } }
また、CacheEventFilter
または CacheEventConverter
を使用する場合、EventTypeには、再試行によりイベントが生成されたかどうかを確認するために、メソッド isRetry
が含まれます。
3.3.2. キャッシュマネージャーレベルの通知
キャッシュマネージャーレベルのイベントは、キャッシュマネージャーで行われます。これらはグローバルでクラスター全体でもありますが、単一のキャッシュマネージャーによって作成されたすべてのキャッシュに影響するイベントが関係します。キャッシュマネージャーレベルのイベントの例として、クラスターに参加または退出するノード、または開始または停止するキャッシュがあります。
キャッシュマネージャーレベルのすべての通知とそれぞれのメソッドレベルのアノテーションの包括的なリストは、org.infinispan.notifications.cachemanagerlistener.annotation package を参照してください。
3.3.3. イベントの同期
デフォルトでは、すべての非同期通知は通知スレッドプールにディスパッチされます。同期通知は、リスナーメソッドが完了するか (スレッドがブロックする原因となる)、または CompletionStage が完了するまで、操作の続行を遅らせます。または、リスナーに非同期としてアノテーションを付けることもできます。この場合、操作は即座に継続され、通知は通知スレッドプールで非同期に完了します。これには、以下のようにリスナーにアノテーションを付けます。
非同期リスナー
@Listener (sync = false) public class MyAsyncListener { @CacheEntryCreated void listen(CacheEntryCreatedEvent event) { } }
同期リスナーのブロック
@Listener public class MySyncListener { @CacheEntryCreated void listen(CacheEntryCreatedEvent event) { } }
ノンブロッキングリスナー
@Listener public class MyNonBlockingListener { @CacheEntryCreated CompletionStage<Void> listen(CacheEntryCreatedEvent event) { } }
3.3.3.1. 非同期スレッドプール
このような非同期通知のディスパッチに使用されるスレッドプールを調整するには、設定ファイルの <listener-executor />
XML 要素を使用します。
3.4. Asynchronous API
Cache.put()、Cache.remove() などの同期 API メソッドの他に、Data Grid には非同期のノンブロッキング API も含まれ、同じ結果をノンブロッキング方式で達成できます。
これらのメソッドの名前は、ブロックメソッドと同様の名前が付けられ、"Async"が追加されます。 例: Cache.putAsync()、Cache.removeAsync() など。 これらの非同期のメソッドは、操作の実際の結果が含まれる CompletableFuture を返します。
たとえば、Cache<String, String>
としてパラメーター化されたキャッシュでは、Cache.put(String key, String value)
は String
を返し、Cache.putAsync(String key, String value)
は CompletableFuture<String>
を返します。
3.4.1. このような API を使用する理由
ノンブロッキング API は、通信の失敗や例外を処理する機能を備えており、同期通信の保証をすべて提供するという点で強力なもので、呼び出しが完了するまでブロックする必要がありません。 これにより、システムで並列処理をより有効に活用できます。 以下に例を示します。
Set<CompletableFuture<?>> futures = new HashSet<>(); futures.add(cache.putAsync(key1, value1)); // does not block futures.add(cache.putAsync(key2, value2)); // does not block futures.add(cache.putAsync(key3, value3)); // does not block // the remote calls for the 3 puts will effectively be executed // in parallel, particularly useful if running in distributed mode // and the 3 keys would typically be pushed to 3 different nodes // in the cluster // check that the puts completed successfully for (CompletableFuture<?> f: futures) f.get();
3.4.2. 実際に非同期で発生するプロセス
Data Grid には、通常の書き込み操作の重要なパスにあると見なされる 4 つの項目があります。これらの項目をコスト順に示します。
- ネットワークコール
- マーシャリング
- キャッシュストアへの書き込み (オプション)
- ロック
非同期メソッドを使用すると、ネットワーク呼び出しとマーシャリングがクリティカルパスから除外されます。 ただし、さまざまな技術的な理由により、キャッシュストアへの書き込みとロックの取得は、呼び出し元のスレッドで引き続き発生します。
第4章 キャッシュ内の値のクエリー
Data Grid を使用すると、クエリーを実行して、埋め込み Data Grid クラスターまたはリモート Data Grid サーバークラスターの両方で、データセット内の値を効率的かつ迅速に検索できます。
Plain Old Java Object(POJO) として格納されているキャッシュ値、または プロトコルバッファー としてのみエンコードされているオブジェクトに、インデックスを付けてクエリーを実行できます。
4.1. キャッシュをインデックス化するための Data Grid の設定
キャッシュに値のインデックスを作成して、クエリーのパフォーマンスを向上させ、全文検索機能を使用します。
Data Grid は、Apache Lucene テクノロジーを使用して、キャッシュ内の値にインデックスを付けます。
手順
キャッシュ設定でのインデックス作成を有効にします。
<distributed-cache name="my-cache"> <indexing> <!-- Indexing configuration goes here. --> </indexing> </distributed-cache>
<indexing>
要素を追加すると、自動的にインデックスが有効になります。設定スキーマではenabled
属性のデフォルト値がfalse
であっても、enabled
属性を含める必要はありません。インデックスを作成する各エンティティーを、
indexed-entity
要素の値として指定します。<distributed-cache name="my-cache"> <indexing> <indexed-entities> <indexed-entity>...</indexed-entity> </indexed-entities> </indexing> </distributed-cache>
古い Java オブジェクト
POJO を保存するキャッシュの場合、@Indexed
でアノテーションが付けられた完全修飾クラス名を指定します。以下に例を示します。
<indexed-entities> <indexed-entity>org.infinispan.sample.Car</indexed-entity> <indexed-entity>org.infinispan.sample.Truck</indexed-entity> </indexed-entities>
Protobuf
Protobuf-encoded エントリーを保存するキャッシュの場合、Protobuf スキーマで宣言された Message
を指定します。
たとえば、以下の Protobuf スキーマを使用します。
package book_sample; /* @Indexed */ message Book { /* @Field(store = Store.YES, analyze = Analyze.YES */ optional string title = 1; /* @Field(store = Store.YES, analyze = Analyze.YES */ optional string description = 2; optional int32 publicationYear = 3; // no native Date type available in Protobuf repeated Author authors = 4; } message Author { optional string name = 1; optional string surname = 2; }
次に、indexed-entity
要素に以下の値を指定する必要があります。
<indexed-entities> <indexed-entity>book_sample.Book</indexed-entity> </indexed-entities>
4.1.1. プログラムによるキャッシュインデックスの有効化
Data Grid API 経由でプログラムを使用してキャッシュのインデックスを設定します。
手順
-
Data Grid を組み込みライブラリーとして使用する場合は、以下の例のように
IndexingConfigurationBuilder
クラスでキャッシュのインデックス作成を有効にし、設定します。
import org.infinispan.configuration.cache.*; ConfigurationBuilder config=new ConfigurationBuilder(); config.indexing().enable().storage(FILESYSTEM).path("/some/folder").addIndexedEntity(Book.class);
4.1.2. インデックスアノテーション
Data Grid キャッシュでインデックス作成を有効にする場合、以下のアノテーションを使用します。
-
@indexed
は、インデックスを作成する Java オブジェクトを示します。 -
@field
は、オブジェクト内のフィールドがインデックス化される方法を制御します。
埋め込みライブラリーとしての Data Grid の場合は、これらのアノテーションを Java クラスに追加します。
Data Grid Server の場合は、これらのアノテーションを含む Protobuf スキーマ、.proto
ファイルを定義します。
4.1.3. インデックス設定
Data Grid 設定は、インデックスの保存および構築方法を制御します。
4.1.3.1. インデックスストレージ
Data Grid がインデックスを保存する方法を設定できます。
- ホストファイルシステム上。これはデフォルトであり、再起動間でインデックスを保持します。
-
JVM ヒープメモリー。これはインデックスが再起動後も存続しないことを意味します。
インデックスは、小さなデータセットの場合にのみ、JVM ヒープメモリーに格納する必要があります。
ファイルシステム
<distributed-cache name="my-cache"> <indexing storage="filesystem" path="${java.io.tmpdir}/baseDir"> <!-- Indexing configuration goes here. --> </indexing> </distributed-cache>
JVM ヒープメモリー
<distributed-cache name="my-cache"> <indexing storage="local-heap"> <!-- Additional indexing configuration goes here. --> </indexing> </distributed-cache>
4.1.3.2. インデックスリーダー
インデックスリーダーは、クエリーを実行するためにインデックスへのアクセスを提供する内部コンポーネントです。インデックスのコンテンツが変更されると、Data Grid はリーダーを更新し、検索結果が最新の状態になるようにする必要があります。インデックスリーダーの更新間隔を設定できます。デフォルトでは、インデックスが最終更新以降に変更された場合、各クエリーの前に Data Grid はインデックスを読み取ります。
<distributed-cache name="my-cache"> <indexing storage="filesystem" path="${java.io.tmpdir}/baseDir"> <!-- Sets an interval of one second for the index reader. --> <index-reader refresh-interval="1000"/> <!-- Additional indexing configuration goes here. --> </indexing> </distributed-cache>
4.1.3.3. インデックスライター
インデックスライターは、パフォーマンスを改善するために時間の経過とともにマージできる 1 つ以上のセグメント (サブインデックス) で設定されるインデックスを構築する内部コンポーネントです。インデックスリーダーの操作では、すべてのセグメントを考慮する必要があるため、通常、セグメントが少ないということは、クエリー中のオーバーヘッドが少ないことを意味します。
Data Grid は Apache Lucene を内部的に使用し、メモリーとストレージという 2 つの層でエントリーにインデックスを付けます。新規エントリーは、最初にメモリーインデックスに移動してから、フラッシュが実行されると、設定されたインデックスストレージに移動します。定期的なコミット操作は、フラッシュしたデータからセグメントを作成し、すべてのインデックス変更を永続化します。
index-writer
設定は任意です。デフォルトはほとんどの場合に機能するはずであり、カスタム設定はパフォーマンスを調整するためにのみ使用する必要があります。
<distributed-cache name="my-cache"> <indexing storage="filesystem" path="${java.io.tmpdir}/baseDir"> <index-writer commit-interval="2000" low-level-trace="false" max-buffered-entries="32" queue-count="1" queue-size="10000" ram-buffer-size="400" thread-pool-size="2"> <index-merge calibrate-by-deletes="true" factor="3" max-entries="2000" min-size="10" max-size="20"/> </index-writer> <!-- Additional indexing configuration goes here. --> </indexing> </distributed-cache>
表4.1 インデックスライター設定属性
属性 | 説明 |
---|---|
| メモリーにバッファーリングされたインデックスの変更がインデックスストレージにフラッシュされ、コミットが実行される時間 (ミリ秒単位)。操作にはコストがかかるため、小さな値は避けてください。デフォルトは 1000 ミリ秒 (1 秒) です。 |
|
インデックスストレージにフラッシュされる前に、インメモリーにバッファーリングできるエントリーの最大数。値が大きくなると、インデックスが高速になりますが、より多くのメモリーが使用されます。 |
|
追加されたエントリーと削除をインデックスストレージにフラッシュする前にバッファーリングするために使用できるメモリーの最大量。値が大きくなると、インデックスが高速になりますが、より多くのメモリーが使用されます。インデックス作成のパフォーマンスを向上させるには、 |
| インデックスへの書き込み操作を実行するスレッドの数。 |
|
それぞれのインデックス化されたタイプに使用する内部キューの数。各キューは、インデックスに適用される変更のバッチを保持し、キューは並行して処理されます。キューの数を増やすと、インデックスのスループットが増えますが、ボトルネックが CPU である場合に限ります。最適な結果を得るには、 |
|
各キューが保持できる要素の最大数。 |
| インデックス化操作の低レベルのトレース情報を有効にします。この属性を有効にすると、パフォーマンスが大幅に低下します。この低レベルのトレースは、トラブルシューティングの最後のリソースとしてのみ使用する必要があります。 |
Data Grid がインデックスセグメントをマージする方法を設定するには、index-merge
サブ要素を使用します。
表4.2 インデックスのマージ設定属性
属性 | 説明 |
---|---|
| インデックスセグメントがマージする前に持つことができるエントリーの最大数。この数を超えるエントリーを持つセグメントはマージされません。値を小さくすると、頻繁に変更されるインデックスでのパフォーマンスが向上します。値を大きくすると、インデックスが頻繁に変更されない場合に検索パフォーマンスが向上します。 |
| 一度にマージされるセグメントの数。値が小さいほど、マージが頻繁に発生し、より多くのリソースが使用されますが、セグメントの総数は平均して少なくなり、検索パフォーマンスが向上します。より大きな値 (10 より大きい値) は、大量の書き込みシナリオに最適です。 |
| バックグラウンドマージのセグメントの最小ターゲットサイズ (MB 単位)。このサイズよりも小さなセグメントは積極的にマージされます。値が大きすぎると、頻度は低くなりますが、マージ操作のコストが高くなる可能性があります。 |
|
バックグラウンドマージのセグメントの最大サイズ (MB 単位)。このサイズよりも大きなセグメントは、バックグラウンドでマージされることはありません。これを低い値に設定すると、メモリー要件が軽減され、最適な検索速度を犠牲にして、一部のマージ操作が回避されます。インデックスを強制的にマージする場合、この属性は無視され、代わりに |
|
強制マージのセグメントの最大サイズ (MB 単位) で、 |
|
セグメントのエントリーをカウントする際に、インデックスで削除されたエントリーの数が考慮されるかどうか。 |
参照資料
インデックス化要素および属性の詳細は、Data Grid Configuration Schema を参照してください。
4.1.4. インデックスの再構築
インデックスを再構築すると、キャッシュに保存されているデータから再構築されます。インデックス付きタイプやアナライザーの定義などを変更する場合は、インデックスを再構築します。同様に、何らかの理由でインデックスが削除されている場合は、インデックスを再構築する必要がある場合があります。
インデックスの再構築プロセスは、グリッド内のすべてのデータに対して行われるため、完了するまでに長い時間がかかる場合があります。再構築操作の進行中は、クエリーが返す結果も少なくなる可能性があります。
インデックスを再構築する際に Data Grid は以下の警告メッセージをログに記録します。
WARN: Rebuilding indexes also affect queries, that can return less results than expected.
手順
以下のように
reindexCache()
メソッドを呼び出して、リモート Data Grid サーバーでインデックスを再構築します。remoteCacheManager.administration().reindexCache("MyCache");
以下のように、Data Grid を組み込みライブラリーとして使用する場合にインデックスを再構築します。
Indexer indexer = Search.getIndexer(cache); CompletionStage<Void> future = index.run();
4.2. Ickle クエリーの作成
Data Grid は、リレーショナルクエリーとフルテキストクエリーを作成可能にする Ickle クエリー言語を提供します。
4.2.1. Ickle クエリーの例
API を使用するには、まず QueryFactory をキャッシュに取得してから、.create()
メソッドを呼び出し、クエリーで使用する文字列を渡します。各 QueryFactory
インスタンスは Search
と同じ Cache
インスタンスにバインドされますが、それ以外の場合は、複数のクエリーを並行して作成するために使用できるステートレスおよびスレッドセーフオブジェクトになります。
たとえば、以下のようになります。
// Remote Query, using protobuf QueryFactory qf = org.infinispan.client.hotrod.Search.getQueryFactory(remoteCache); Query<Transaction> q = qf.create("from sample_bank_account.Transaction where amount > 20"); // Embedded Query using Java Objects QueryFactory qf = org.infinispan.query.Search.getQueryFactory(cache); Query<Transaction> q = qf.create("from org.infinispan.sample.Book where price > 20"); // Execute the query QueryResult<Book> queryResult = q.execute();
クエリーは常に単一のエンティティータイプをターゲットにし、単一のキャッシュの内容に対して評価されます。複数のキャッシュでクエリーを実行したり、複数のエンティティータイプ (結合) を対象とするクエリーを作成したりすることは、サポートされていません。
クエリーの実行と結果のフェッチは、Query
オブジェクトの execute()
メソッドを呼び出すのと同じくらい簡単です。実行後に、同じインスタンスで execute()
を呼び出すと、クエリーを再実行します。
ページネーション
Query.maxResults(int maxResults)
を使用して、返される結果の数を制限することができます。これを Query.startOffset(long startOffset)
と組み合わせて使用すると、結果セットのページネーションを実現できます。
// sorted by year and match all books that have "clustering" in their title // and return the third page of 10 results Query<Book> query = queryFactory.create("FROM org.infinispan.sample.Book WHERE title like '%clustering%' ORDER BY year").startOffset(20).maxResults(10)
ヒット数
QueryResult
オブジェクトには、ページネーションパラメーターに関係なく、クエリーの結果の合計数を返すための .hitCount()
メソッドがあります。ヒット数は、パフォーマンス上の理由から、インデックス付きクエリーでのみ使用できます。
反復
Query
オブジェクトには、結果を遅延して取得するための .iterator()
メソッドがあります。使用後に閉じる必要がある CloseableIterator
のインスタンスを返します。
リモートクエリーの反復サポートは現在制限されています。反復する前に、最初にすべてのエントリーをクライアントにフェッチするためです。
名前付きクエリーパラメーター
実行ごとに新しい Query オブジェクトを作成する代わりに、実行前に実際の値に置き換えることができる名前付きパラメーターをクエリーに含めることができます。これにより、クエリーを 1 度定義し、複数回効率的に実行できます。パラメーターは、Operator の右側でのみ使用でき、通常の定数値ではなく、org.infinispan.query.dsl.Expression.param(String paramName)
メソッドによって生成されたオブジェクトを Operator に提供することで、クエリーの作成時に定義されます。パラメーターが定義されたら、以下の例に示すように Query.setParameter(parameterName, value)
または Query.setParameters(parameterMap)
のいずれかを呼び出すことで設定できます。
QueryFactory queryFactory = Search.getQueryFactory(cache); // Defining a query to search for various authors and publication years Query<Book> query = queryFactory.create("SELECT title FROM org.infinispan.sample.Book WHERE author = :authorName AND publicationYear = :publicationYear").build(); // Set actual parameter values query.setParameter("authorName", "Doe"); query.setParameter("publicationYear", 2010); // Execute the query List<Book> found = query.execute.list();
または、実際のパラメーター値のマップを指定して、複数のパラメーターを一度に設定することもできます。
複数の名前付きパラメーターを一度に設定する
Map<String, Object> parameterMap = new HashMap<>(); parameterMap.put("authorName", "Doe"); parameterMap.put("publicationYear", 2010); query.setParameters(parameterMap);
クエリーの解析、検証、および実行計画の作業の大部分は、パラメーターでのクエリーの最初の実行時に実行されます。この作業は後続の実行時には繰り返し行われないため、クエリーパラメーターではなく定数値を使用した同様のクエリーの場合よりもパフォーマンスが向上します。
4.2.2. Ickle クエリー言語パーサー構文
Ickle クエリー言語は、フルテキスト用のいくつかのエクステンションを持つ JPQL クエリー言語の小さなサブセットです。
パーサー構文には、以下のような重要なルールがあります。
- 空白は重要ではありません。
- フィールド名ではワイルドカードはサポートされません。
- デフォルトのフィールドがないため、フィールド名またはパスは必ず指定する必要があります。
-
&&
および||
は、フルテキストと JPA 述語の両方で、AND
またはOR
の代わりに使用できます。 -
!
はNOT
の代わりに使用できます。 -
足りないブール値 Operator は
OR
として解釈されます。 - 文字列の用語は、一重引用符または二重引用符で囲む必要があります。
- ファジー性とブースティングは任意の順序で受け入れられず、常にファジー性が最初になります。
-
<>
の代わりに!=
が許可されます。 -
ブーディングは、
>
、>=
、<
、⇐
Operator には適用できません。同じ結果を達成するために範囲を使用することができます。
Operator のフィルタリング
Ickle はインデックス化されたフィールドとインデックス化されていないフィールドの両方に使用できる多くの Operator のフィルタリングをサポートします。
Operator | 説明 | 例 |
---|---|---|
in | 左のオペランドが引数として指定された値のコレクションからの要素のいずれかと等しいことを確認します。 | FROM Book WHERE isbn IN ('ZZ', 'X1234') |
like | (文字列として想定される) 左側の引数が、JPA ルールに準拠するワイルドカードパターンと一致することを確認します。 | FROM Book WHERE title LIKE '%Java%' |
= | 左側の引数が指定の値と完全に一致することを確認します。 | FROM Book WHERE name = 'Programming Java' |
!= | 左側の引数が指定の値とは異なることを確認します。 | FROM Book WHERE language != 'English' |
> | 左側の引数が指定の値よりも大きいことを確認します。 | FROM Book WHERE price > 20 |
>= | 左側の引数が指定の値以上であることを確認します。 | FROM Book WHERE price >= 20 |
< | 左側の引数が指定の値未満であることを確認します。 | FROM Book WHERE year < 2012 |
⇐ | 左側の引数が指定の値以下であることを確認します。 | FROM Book WHERE price ⇐ 50 |
between | 左側の引数が指定された範囲の制限の間にあることを確認します。 | FROM Book WHERE price BETWEEN 50 AND 100 |
ブール値の条件
以下の例では、複数の属性条件を論理結合 (and
) および非結合 (or
) 演算子と組み合わせて、より複雑な条件を作成する方法を示しています。ブール値演算子のよく知られる演算子の優先順位ルールがここで適用されるため、Operator の順序は関連性がありません。ここで、or
が最初に呼び出された場合でも、and
Operator の優先順位は or
よりも高くなります。
# match all books that have "Data Grid" in their title # or have an author named "Manik" and their description contains "clustering" FROM org.infinispan.sample.Book WHERE title LIKE '%Data Grid%' OR author.name = 'Manik' AND description like '%clustering%'
ブール値の否定は論理演算子の中で優先され、次の単純な属性条件にのみ適用されます。
# match all books that do not have "Data Grid" in their title and are authored by "Manik" FROM org.infinispan.sample.Book WHERE title != 'Data Grid' AND author.name = 'Manik'
ネストされた条件
論理演算子の優先順位の変更は、括弧を使用して行います。
# match all books that have an author named "Manik" and their title contains # "Data Grid" or their description contains "clustering" FROM org.infinispan.sample.Book WHERE author.name = 'Manik' AND ( title like '%Data Grid%' OR description like '% clustering%')
属性の選択
一部のユースケースでは、属性のごく一部のみがアプリケーションによって実際に使用されている場合、特にドメインエンティティーにエンティティーが埋め込まれている場合、ドメインオブジェクト全体を返すのはやり過ぎです。クエリー言語を使用すると、プロジェクションを返す属性 (または属性パス) のサブセットを指定できます。展開が使用される場合、QueryResult.list()
はドメインエンティティー全体を返しませんが、Object[]
の List
(プロジェクト化された属性に対応する配列) を返します。
# match all books that have "Data Grid" in their title or description # and return only their title and publication year SELECT title, publicationYear FROM org.infinispan.sample.Book WHERE title like '%Data Grid%' OR description like '%Data Grid%'
ソート
1 つ以上の属性または属性パスに基づいて結果の順序は ORDER BY
句で行われます。複数の並べ替え基準が指定されている場合は、順序によって優先順位が決まります。
# match all books that have "Data Grid" in their title or description # and return them sorted by the publication year and title FROM org.infinispan.sample.Book WHERE title like '%Data Grid%' ORDER BY publicationYear DESC, title ASC
グループ化およびアグリゲーション
Data Grid には、グループ化フィールドのセットに従ってクエリー結果をグループ化し、各グループに分類される値のセットに集計関数を適用することにより、各グループからの結果の集計を構築する機能があります。グループ化と集計は、プロジェクションクエリー (SELECT 句に 1 つ以上のフィールドがあるクエリー) にのみ適用できます。
サポートされる集約は avg
、sum
、count
、max
および min
です。
グループ化フィールドセットは GROUP BY
句で指定し、グループ化フィールドの定義に使用される順番は関係ありません。プロジェクションで選択されたすべてのフィールドは、グループ化フィールドであるか、以下で説明するグループ化関数の 1 つを使用して集約される必要があります。Projection フィールドは集約され、同時にグループ化に使用できます。グループ化フィールドのみを選択し、集計フィールドは選択しないクエリーは有効です。例: ブックマークは作成者別にグループ化し、それらをカウントします。
SELECT author, COUNT(title) FROM org.infinispan.sample.Book WHERE title LIKE '%engine%' GROUP BY author
選択したすべてのフィールドに集計関数が適用され、グループ化にフィールドが使用されないプロジェクションクエリーが許可されます。この場合、集計は、単一のグローバルグループが存在するかのようにグローバルに計算されます。
集約
以下の集約関数をフィールドに適用できます。
表4.3 インデックスのマージ属性
集約関数 | 説明 |
---|---|
|
一連の数字の平均を計算します。許可される値は、 |
|
null 以外の行の数をカウントし、 |
|
見つかった最も大きな値を返します。許可される値は |
|
見つかった最小値を返します。許可される値は |
|
数字のセットの合計を計算します。null 以外の値がない場合、結果は代わりに |
表4.4 テーブル合計戻り値のタイプ
フィールドタイプ | 戻り値のタイプ |
---|---|
Integral (BigInteger 以外) | Long |
Float または Double | double |
BigInteger | BigInteger |
BigDecimal | BigDecimal |
グループ化および集計を使用したクエリーの評価
集計クエリーには、通常のクエリーのようにフィルター条件を含めることができます。フィルタリングは、グループ化操作の前後の 2 つのステージで実行できます。グループ化操作の実行前に groupBy()
メソッドを起動する前に定義されたフィルター条件はすべて、キャッシュエントリーに直接 (最終的な展開ではなく) キャッシュエントリーに適用されます。これらのフィルター条件は、照会されたエンティティータイプの任意のフィールドを参照でき、グループ化ステージの入力となるデータセットを制限することを目的としています。groupBy()
メソッドの呼び出し後に定義されたフィルター条件はすべて、展開およびグループ化操作の結果が展開されます。このフィルター条件は、groupBy()
フィールドまたは集約されたフィールドのいずれかを参照できます。select 句で指定されていない集約フィールドを参照することは許可されています。ただし、非集計フィールドと非グループ化フィールドを参照することは禁止されています。このフェーズでフィルタリングすると、プロパティーに基づいてグループの数が減ります。通常のクエリーと同様に、並べ替えも指定できます。順序付け操作は、グループ化操作後に実行され、groupBy()
フィールドまたは集約されたフィールドのいずれかを参照できます。
4.2.3. フルテキスト検索の使用
Fuzzy クエリー
ファジークエリー add ~
を整数とともに実行するには、用語の後に使用される用語からの距離を表します。たとえば、以下のようになります。
FROM sample_bank_account.Transaction WHERE description : 'cofee'~2
範囲クエリー
以下の例に示すように、範囲クエリーを実行するには、中括弧のペア内で指定の境界を定義します。
FROM sample_bank_account.Transaction WHERE amount : [20 to 50]
フレーズクエリー
次の例に示すように、単語のグループは引用符で囲むことで検索できます。
FROM sample_bank_account.Transaction WHERE description : 'bus fare'
近接クエリー
特定の距離内で 2 つの用語を検索して近接クエリーを実行するには、フレーズの後に距離とともに ~
を追加します。たとえば、以下の例では、キャンセル と fee という単語が 3 個以上ありません。
FROM sample_bank_account.Transaction WHERE description : 'canceling fee'~3
ワイルドカードクエリー
"text" または "test" を検索するには、単一文字のワイルドカード検索 ?
を使用します。
FROM sample_bank_account.Transaction where description : 'te?t'
"test"、"tests"、"tester を検索するには 、マルチ文字のワイルドカード検索 *
を使用します。
FROM sample_bank_account.Transaction where description : 'test*'
正規表現クエリー
正規表現クエリーは、/
の間のパターンを指定することで実行できます。Ickle は Lucene の正規表現構文を使用しているため、単語 moat
または boat
を検索するには、以下を使用できます。
FROM sample_library.Book where title : /[mb]oat/
クエリーのブースト
用語は、指定のクエリーにおける耐障害性を高めるために ^
を追加し、条件を強化できます。たとえば、ビールとビールとの関連性が 3 倍高いビールとワインを含むタイトルを検索するには、次のように使用できます。
FROM sample_library.Book WHERE title : beer^3 OR wine
4.3. 組み込みクエリー
データグリッドをライブラリーとしてカスタムアプリケーションに追加する場合は、埋め込みクエリーを使用します。
埋め込みクエリーでは、Protobuf マッピングは必要ありません。インデックス作成とクエリーは、どちらも Java オブジェクト上で実行されます。
4.3.1. 埋め込みクエリーの例
books と呼ばれる Data Grid キャッシュに Book
インスタンスを保存します。Book
インスタンスはインデックス化されるため、キャッシュ設定のインデックスを有効にします。
infinispan.xml
<distributed-cache name="books"> <indexing path="${user.home}/index"> <indexed-entities> <indexed-entity>org.infinispan.sample.Book</indexed-entity> </indexed-entities> </indexing> </distributed-cache>
キャッシュを取得します。
import org.infinispan.Cache; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; EmbeddedCacheManager manager = new DefaultCacheManager("infinispan.xml"); Cache<String, Book> cache = manager.getCache("books");
各 Book
は、次の例のように定義されます。インデックスを作成するプロパティーを選択する必要があります。プロパティーごとに、Hibernate Search プロジェクトで定義されたアノテーションを使用して高度なインデックスオプションを任意で選択できます。
Book.java
package org.infinispan.sample; import java.time.LocalDate; import java.util.HashSet; import java.util.Set; import org.hibernate.search.mapper.pojo.mapping.definition.annotation.*; // Annotate values with @Indexed to add them to indexes // Annotate each fields according to how you want to index it @Indexed public class Book { @FullTextField String title; @FullTextField String description; @KeywordField String isbn; @GenericField LocalDate publicationDate; @IndexedEmbedded Set<Author> authors = new HashSet<Author>(); }
Author.java
package org.infinispan.sample; import org.hibernate.search.mapper.pojo.mapping.definition.annotation.FullTextField; public class Author { @FullTextField String name; @FullTextField String surname; }
Data Grid Cache
に複数の Book
インスタンスを保存したとすると、次の例のように、一致するフィールドを検索できます。
QueryExample.java
// Get the query factory from the cache QueryFactory queryFactory = org.infinispan.query.Search.getQueryFactory(cache); // Create an Ickle query that performs a full-text search using the ':' operator on the 'title' and 'authors.name' fields // You can perform full-text search only on indexed caches Query<Book> fullTextQuery = queryFactory.create("FROM org.infinispan.sample.Book b WHERE b.title:'infinispan' AND b.authors.name:'sanne'"); // Use the '=' operator to query fields in caches that are indexed or not // Non full-text operators apply only to fields that are not analyzed Query<Book> exactMatchQuery=queryFactory.create("FROM org.infinispan.sample.Book b WHERE b.isbn = '12345678' AND b.authors.name : 'sanne'"); // You can use full-text and non-full text operators in the same query Query<Book> query=queryFactory.create("FROM org.infinispan.sample.Book b where b.authors.name : 'Stephen' and b.description : (+'dark' -'tower')"); // Get the results List<Book> found=query.execute().list();
4.3.2. マッピングエンティティー
Data Grid は、エンティティーレベルでインデックス作成の詳細な設定を定義するため Hibernate Search の API に依存します。この設定には、アノテーションが付けられたフィールド、使用するアナライザー、ネストされたオブジェクトのマッピング方法などが含まれます。詳細なドキュメントは the Hibernate Search manual を参照してください。
@DocumentId
Hibernate Search とは異なり、@DocumentId
を使用してフィールドを識別子としてマーク付けすると、Data Grid は値を保存するために使用されるキーになります。すべての @Indexed
オブジェクトの識別子は、値を保存するために使用されるキーになります。@Transformable
、カスタム型、およびカスタム FieldBridge
実装の組み合わせを使用して、キーのインデックス化方法をカスタマイズできます。
@Transformable keys
各値のキーはインデックス化する必要があり、キーインスタンスを String
で変換する必要があります。Data Grid には、共通のプリミティブをエンコードするためのデフォルトの変換ルーチンが含まれていますが、カスタムキーを使用するには org.infinispan.query.Transformer
の実装を提供する必要があります。
アノテーションを使用したキートランスフォーマーの登録
キークラスに org.infinispan.query.Transformable
のアノテーションを付け、カスタムトランスフォーマー実装が自動的に選択されます。
@Transformable(transformer = CustomTransformer.class) public class CustomKey { ... } public class CustomTransformer implements Transformer { @Override public Object fromString(String s) { ... return new CustomKey(...); } @Override public String toString(Object customType) { CustomKey ck = (CustomKey) customType; return ... } }
キャッシュインデックス設定を介したキートランスフォーマーの登録
埋め込みおよびサーバー設定の両方で、key-transformers
xml 要素を使用します。
<replicated-cache name="test"> <indexing auto-config="true"> <key-transformers> <key-transformer key="com.mycompany.CustomKey" transformer="com.mycompany.CustomTransformer"/> </key-transformers> </indexing> </replicated-cache>
または、Java 設定 API(組み込みモード) を使用します。
ConfigurationBuilder builder = ... builder.indexing().enable() .addKeyTransformer(CustomKey.class, CustomTransformer.class);
プログラムによるマッピング
アノテーションを使用してエンティティーをインデックスにマップする代わりに、プログラムで設定することもできます。
次の例では、グリッドに格納され、クラスにアノテーションを付けなくても 2 つのプロパティーで検索可能にするオブジェクト Author
をマップします。
import org.apache.lucene.search.Query; import org.hibernate.search.cfg.Environment; import org.hibernate.search.cfg.SearchMapping; import org.hibernate.search.query.dsl.QueryBuilder; import org.infinispan.Cache; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.Index; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.query.CacheQuery; import org.infinispan.query.Search; import org.infinispan.query.SearchManager; import java.io.IOException; import java.lang.annotation.ElementType; import java.util.Properties; SearchMapping mapping = new SearchMapping(); mapping.entity(Author.class).indexed() .property("name", ElementType.METHOD).field() .property("surname", ElementType.METHOD).field(); Properties properties = new Properties(); properties.put(Environment.MODEL_MAPPING, mapping); properties.put("hibernate.search.[other options]", "[...]"); Configuration infinispanConfiguration = new ConfigurationBuilder() .indexing().index(Index.NONE) .withProperties(properties) .build(); DefaultCacheManager cacheManager = new DefaultCacheManager(infinispanConfiguration); Cache<Long, Author> cache = cacheManager.getCache(); SearchManager sm = Search.getSearchManager(cache); Author author = new Author(1, "Manik", "Surtani"); cache.put(author.getId(), author); QueryBuilder qb = sm.buildQueryBuilderForClass(Author.class).get(); Query q = qb.keyword().onField("name").matching("Manik").createQuery(); CacheQuery cq = sm.getQuery(q, Author.class); assert cq.getResultSize() == 1;
4.4. リモートクエリー
クライアントからアクセスする Data Grid Sever クラスターを設定する場合は、リモートクエリーを使用します。
リモートクエリーを実行するには、キャッシュ内のデータで、ネットワーク上の送信とストレージ両方のエンコーディングとして Google Protocol Buffers を使用する必要があります。さらに、リモートクエリーでは、データ構造とインデックス要素を定義するために Protobuf スキーマ (.proto
ファイル) が必要です。
リモートクエリーで Protobuf を使用する利点は、言語に依存せず、Hot Rod Java クライアントだけでなく、REST、C ++、C#、および Node.js クライアントでも機能することです。
4.4.1. リモートクエリーの例
Book
と呼ばれるオブジェクトは、books と呼ばれる Data Grid キャッシュに保存されます。Book インスタンスはインデックス化されるため、キャッシュのインデックスを有効にします。
infinispan.xml
<replicated-cache name="books"> <indexing> <indexed-entities> <indexed-entity>book_sample.Book</indexed-entity> </indexed-entities> </indexing> </replicated-cache>
または、キャッシュにインデックスが付けられていない場合は、<encoding>
を application/x-protostream
として設定して、ストレージがクエリー可能であることを確認します。
infinispan.xml
<replicated-cache name="books"> <encoding media-type="application/x-protostream"/> </replicated-cache>
各 Book
は以下の例のように定義されます。@Protofield
アノテーションを使用してメッセージフィールドを識別し、フィールドの @ProtoDoc
アノテーションを使用してインデックス属性を設定します。
Book.java
import org.infinispan.protostream.annotations.ProtoDoc; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; @ProtoDoc("@Indexed") public class Book { @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.YES, store = Store.NO)") @ProtoField(number = 1) final String title; @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.YES, store = Store.NO)") @ProtoField(number = 2) final String description; @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.YES, store = Store.NO)") @ProtoField(number = 3, defaultValue = "0") final int publicationYear; @ProtoFactory Book(String title, String description, int publicationYear) { this.title = title; this.description = description; this.publicationYear = publicationYear; } // public Getter methods omitted for brevity }
コンパイル時に、前述の例のアノテーションは、Book
インスタンスの読み取り、書き込み、およびクエリーに必要なアーティファクトを生成します。この生成を有効にするには、空のコンストラクターまたはインターフェイスを使用して新しく作成されたクラスで @AutoProtoSchemaBuilder
アノテーションを使用します。
RemoteQueryInitializer.java
import org.infinispan.protostream.SerializationContextInitializer; import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder; @AutoProtoSchemaBuilder( includeClasses = { Book.class }, schemaFileName = "book.proto", schemaFilePath = "proto/", schemaPackageName = "book_sample") public interface RemoteQueryInitializer extends SerializationContextInitializer { }
コンパイル後、ファイル book.proto
ファイルが、アノテーションが付いたインターフェイスの実装 RemoteQueryInitializerImpl.java
とともに、設定された schemaFilePath
に作成されます。この具体的なクラスは Hot Rod クライアントコードで直接使用して、シリアル化コンテキストを初期化します。
すべてを 1 つにまとめます。
RemoteQuery.java
package org.infinispan; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.Search; import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; import org.infinispan.query.dsl.Query; import org.infinispan.query.dsl.QueryFactory; import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants; public class RemoteQuery { public static void main(String[] args) throws Exception { ConfigurationBuilder clientBuilder = new ConfigurationBuilder(); // RemoteQueryInitializerImpl is generated clientBuilder.addServer().host("127.0.0.1").port(11222) .security().authentication().username("user").password("user") .addContextInitializers(new RemoteQueryInitializerImpl()); RemoteCacheManager remoteCacheManager = new RemoteCacheManager(clientBuilder.build()); // Grab the generated protobuf schema and registers in the server. Path proto = Paths.get(RemoteQuery.class.getClassLoader() .getResource("proto/book.proto").toURI()); String protoBufCacheName = ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME; remoteCacheManager.getCache(protoBufCacheName).put("book.proto", Files.readString(proto)); // Obtain the 'books' remote cache RemoteCache<Object, Object> remoteCache = remoteCacheManager.getCache("books"); // Add some Books Book book1 = new Book("Infinispan in Action", "Learn Infinispan with using it", 2015); Book book2 = new Book("Cloud-Native Applications with Java and Quarkus", "Build robust and reliable cloud applications", 2019); remoteCache.put(1, book1); remoteCache.put(2, book2); // Execute a full-text query QueryFactory queryFactory = Search.getQueryFactory(remoteCache); Query<Book> query = queryFactory.create("FROM book_sample.Book WHERE title:'java'"); List<Book> list = query.execute().list(); // Voila! We have our book back from the cache! } }
4.4.2. Protobuf スキーマの登録
protobuf エンティティーをクエリーするには、クライアントおよびサーバーに、Protobuf スキーマ (.proto
ファイル) のエンティティーに関連するメタデータを提供する必要があります。
記述子はサーバーの専用の ___protobuf_metadata
キャッシュに保存されます。このキャッシュのキーと値はどちらもプレーンテキストの文字列です。したがって、新しいスキーマの登録は、スキーマ名をキーとして、スキーマファイル自体を値として使用して、このキャッシュに対して put()
操作を実行するのと同じくらい簡単です。
キャッシュが承認を使用する場合は、__protobuf_metadata
キャッシュにエントリーを追加するのに、CREATE
パーミッションが必要です。デフォルトの承認設定を使用する場合は、ユーザーに少なくとも deployer
ロールを割り当てます。
または、Data Grid CLI、Data Grid Console、REST エンドポイント /rest/v2/schemas
、または JMX 経由の ProtobufMetadataManager
MBean で schema
コマンドを使用できます。
キャッシュに対してインデックスが有効になっている場合でも、 Protobuf スキーマドキュメントアノテーション (@ProtoDoc)
内の @Indexed
および @Field
を使用してインデックスする必要のあるフィールドを指定しない限り、Protobuf でエンコードされたエントリーのフィールドはインデックス付けされません。
4.4.3. 分析
分析は、入力データを、インデックスを作成してクエリーできる 1 つ以上の用語に変換するプロセスです。Embedded Query のマッピングは、Lucene ベースのアナライザーのセットをサポートする Hibernate Search アノテーション を介して行われますが、クライアントサーバーモードでは、アナライザー定義はプラットフォームに依存しない方法で宣言されます。
デフォルトのアナライザー
Data Grid は、以下のようにリモートクエリーに対してデフォルトのアナライザーのセットを提供します。
定義 | 説明 |
---|---|
| テキストフィールドをトークンに分割し、空白と句読点を区切り文字として扱います。 |
| 非文字で区切り、すべての文字を小文字に変換することにより、入力ストリームをトークン化します。空白と非文字は破棄されます。 |
| テキストストリームを空白で分割し、空白以外の文字のシーケンスをトークンとして返します。 |
| テキストフィールド全体を単一トークンとして扱います。 |
| SnowballPorter フィルターを使用して英語の単語を語幹にします。 |
| デフォルトでサイズ 3 つのグラムである n-gram トークンを生成します。 |
|
テキストフィールドを |
これらのアナライザー定義は Apache Lucene をベースとし、as-is で提供されます。tokenizers、filters、および CharFilters に関する詳細は、適切な Lucene のドキュメントを参照してください。
アナライザー定義の使用
アナライザー定義を使用するには、.proto
スキーマファイルで名前でそれらを参照します。
-
Analyze.YES
属性を追加して、プロパティーが分析されていることを示します。 -
@Analyzer
アノテーションで アナライザー定義を指定します。
以下は、参照されたアナライザー定義の例になります。
/* @Indexed */ message TestEntity { /* @Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = "keyword")) */ optional string id = 1; /* @Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = "simple")) */ optional string name = 2; }
@ProtoField
アノテーションが付けられた Java クラスを使用する場合、宣言は以下のようになります。
@ProtoDoc("@Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = \"keyword\"))") @ProtoField(number = 1) final String id; @ProtoDoc("@Field(store = Store.YES, analyze = Analyze.YES, analyzer = @Analyzer(definition = \"simple\"))") @ProtoField(number = 2) final String description;
カスタムアナライザー定義の作成
カスタムアナライザー定義が必要な場合は、以下を行います。
-
JAR
ファイルにパッケージ化されたProgrammaticSearchMappingProvider
インターフェイスの実装を作成します。 -
JAR
のMETA-INF/services/
ディレクトリーにorg.infinispan.query.spi.ProgrammaticSearchMappingProvider
という名前のファイルを指定します。このファイルには、実装の完全修飾クラス名が含まれている必要があります。 JAR
を Data Grid Server インストールのlib/
ディレクトリーにコピーします。重要JAR は、起動時に Data Grid Server で利用できる必要があります。実行しているサーバーに追加することはできません。
以下は、ProgrammaticSearchMappingProvider
インターフェイスの実装例です。
import org.apache.lucene.analysis.core.LowerCaseFilterFactory; import org.apache.lucene.analysis.core.StopFilterFactory; import org.apache.lucene.analysis.standard.StandardFilterFactory; import org.apache.lucene.analysis.standard.StandardTokenizerFactory; import org.hibernate.search.cfg.SearchMapping; import org.infinispan.Cache; import org.infinispan.query.spi.ProgrammaticSearchMappingProvider; public final class MyAnalyzerProvider implements ProgrammaticSearchMappingProvider { @Override public void defineMappings(Cache cache, SearchMapping searchMapping) { searchMapping .analyzerDef("standard-with-stop", StandardTokenizerFactory.class) .filter(StandardFilterFactory.class) .filter(LowerCaseFilterFactory.class) .filter(StopFilterFactory.class); } }
4.5. 継続的なクエリー
継続的なクエリーにより、アプリケーションはクエリーフィルターに現在一致したエントリーを受信するリスナーを登録し、さらにキャッシュ操作の結果としてクエリーされたデータセットへの変更を継続的に通知できます。これには、セットに結合された値の着信一致、更新された一致、変更されて引き続き一致する一致値、およびセットを離れた値の発信一致が含まれます。継続的なクエリーを使用することにより、アプリケーションは、変更を検出するために同じクエリーを繰り返し実行する代わりに、イベントの安定したストリームを受信し、リソースがより効率的に使用されるようになります。たとえば、以下のユースケースすべてで、継続的なクエリーを使用できます。
-
Person エンティティーに
age
プロパティーがあり、ユーザーアプリケーションによって更新される 18〜25 歳の人を返す。 - $2000 を超えるすべてのトランザクションを返す。
- F1 レーサーのラップスピードが 1:45.00 秒未満だったすべての時間を返す (キャッシュにラップエントリーが含まれていて、レース中にラップがライブ入力されていると仮定)。
4.5.1. 連続クエリー実行
継続的クエリーは、以下の場合に通知されるリスナーを使用します。
-
エントリーは、
Join
イベントによって表される指定のクエリーの一致を開始します。 -
一致するエントリーが更新され、
Update
vent によって表されるクエリーの一致が継続されます。 -
エントリーは、
Leave
イベントで表されるクエリーの一致を停止します。
クライアントが継続的なクエリーリスナーを登録すると、すぐにクエリーに一致する結果の受信を開始します (上記のように Join
イベントとして受信された)。さらに、通常は作成、変更、削除、または有効期限イベントを生成するキャッシュ操作の結果として、他のエントリーが Join
イベントとしてクエリーの一致を開始したとき、または Leave
イベントとしてクエリーの一致を停止したときに、後続の通知を受信します。更新されたキャッシュエントリーは、操作の前後でエントリーがクエリーフィルターに一致する場合、Update
イベントを生成します。要約すると、リスナーが Join
、Update
、または Leave
イベントを受信するかどうかを決定するために使用されるロジックは次のとおりです。
- 古い値と新しい値の両方に対するクエリーが false と評価された場合、イベントは抑制されます。
-
古い値に対するクエリーが false と評価され、新しい値に対するクエリーが true と評価された場合、
Join
イベントが送信されます。 -
古い値と新しい値の両方のクエリーが true と評価されると、
Update
イベントが送信されます。 -
古い値に対するクエリーが true と評価され、新しい値に対するクエリーが false と評価された場合、
Leave
イベントが送信されます。 -
古い値に対するクエリーが true と評価され、エントリーが削除または期限切れになると、
Leave
イベントが送信されます。
継続的なクエリーは、グループ化、集約、およびソート操作以外のすべてのクエリー機能を使用できます。
4.5.2. 継続的なクエリーの作成
継続的なクエリーを作成するには、以下を実行します。
- クエリーオブジェクトを作成します。
適切なメソッドを呼び出して、キャッシュの ContinuousQuery(
org.infinispan.query.api.continuous.ContinuousQuery
オブジェクト) を取得します。-
リモートモードの
org.infinispan.client.hotrod.Search.getContinuousQuery(RemoteCache<K, V> cache)
-
組み込みモードの
org.infinispan.query.Search.getContinuousQuery(Cache<K, V> cache)
-
リモートモードの
-
以下のように、クエリーと継続的なクエリーリスナー (
org.infinispan.query.api.continuous.ContinuousQueryListener
) を登録します。
continuousQuery.addContinuousQueryListener(query, listener);
以下の例は、埋め込みモードの単純な継続的なクエリーのユースケースを示しています。
継続的クエリーの登録
import org.infinispan.query.api.continuous.ContinuousQuery; import org.infinispan.query.api.continuous.ContinuousQueryListener; import org.infinispan.query.Search; import org.infinispan.query.dsl.QueryFactory; import org.infinispan.query.dsl.Query; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; [...] // We have a cache of Persons Cache<Integer, Person> cache = ... // We begin by creating a ContinuousQuery instance on the cache ContinuousQuery<Integer, Person> continuousQuery = Search.getContinuousQuery(cache); // Define our query. In this case we will be looking for any Person instances under 21 years of age. QueryFactory queryFactory = Search.getQueryFactory(cache); Query query = queryFactory.create("FROM Person p WHERE p.age < 21"); final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>(); // Define the ContinuousQueryListener ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() { @Override public void resultJoining(Integer key, Person value) { matches.put(key, value); } @Override public void resultUpdated(Integer key, Person value) { // we do not process this event } @Override public void resultLeaving(Integer key) { matches.remove(key); } }; // Add the listener and the query continuousQuery.addContinuousQueryListener(query, listener); [...] // Remove the listener to stop receiving notifications continuousQuery.removeContinuousQueryListener(listener);
21 歳未満の Person インスタンスがキャッシュに追加されると、リスナーによって受信され、 matches
マップに配置されます。これらのエントリーがキャッシュから削除されるか、年齢が 21 歳以上に変更されると、それらは matches
から削除されます。
4.5.3. 継続的なクエリーの削除
クエリーのそれ以上の実行を停止するには、リスナーを単に削除します。
continuousQuery.removeContinuousQueryListener(listener);
4.5.4. 継続的なクエリーパフォーマンス
継続的なクエリーは、アプリケーションに一定の更新ストリームを提供するように設計されており、特に幅広いクエリーに対して非常に多くのイベントが生成される可能性があります。イベントごとに新規の一時的なメモリー割り当てが行われます。この動作によりメモリーが不足し、クエリーが適切に設計されていない場合、OutOfMemoryErrors
(特にリモートモード) が発生する可能性があります。このような問題を防ぐために、一致するエントリーの数と各一致のサイズの両方の観点から、各クエリーが必要な最小限の情報をキャプチャーし (プロジェクションを使用して興味深いプロパティーをキャプチャできます)、各 ContinuousQueryListener
が受信したすべてのイベントをブロックせずにすばやく処理し、リッスンするキャッシュから一致する新しいイベントの生成につながるアクションの実行を回避するように設計されていることが強く推奨されます。
4.6. クエリーの統計
Data Grid は、Search
エントリーポイントを介してインデックス化およびクエリー統計を公開します。これは、以下のようにプログラムで使用できます。
// Statistics for the local cluster member SearchStatistics statistics = Search.getSearchStatistics(cache); // Consolidated statistics for the whole cluster CompletionStage<SearchStatisticsSnapshot> statistics = Search.getClusteredSearchStatistics(cache)
REST による統計
Data Grid Server は、REST エンドポイントを介してインデックスおよびクエリー統計も公開します。
GET /v2/caches/{cacheName}/search/stats
の呼び出しを使用します。
4.7. クエリーパフォーマンスチューニング
Data Grid はクエリーの統計を公開し、設定可能な属性を提供するため、クエリーを監視およびチューニングして最適なパフォーマンスを実現できます。
インデックスの使用状況の統計値の確認
キャッシュにインデックスを付けると、クエリーのパフォーマンスが向上します。ただし、スキーマ内のすべてのフィールドにアノテーションが付けられていない場合など、クエリーでインデックスが部分的にしか使用されない場合があります。
クエリーの各タイプの実行にかかる時間を確認して、クエリーのパフォーマンスのチューニングを開始します。クエリーが遅いと思われる場合は、クエリーがキャッシュのインデックスを使用していること、およびすべてのエンティティーとフィールドマッピングにインデックスが付けられていることを確認する必要があります。
インデックス化パフォーマンス
インデックス処理は、Data Grid クラスターの書き込みスループットを低下させる可能性があります。commit-interval
属性は、メモリーにバッファーリングされたインデックスの変更がインデックスストレージにフラッシュされ、コミットが実行される間隔をミリ秒単位で定義します。
この操作にはコストがかかるため、小さすぎる間隔の設定は避けてください。デフォルトは 1000 ミリ秒 (1 秒) です。
パフォーマンスのクエリー
refresh-interval
属性は、インデックスリーダーの更新の間隔 (ミリ秒単位) を定義します。
デフォルト値は 0
で、キャッシュに書き込まれるとすぐにクエリーのデータを返します。
値が 0
より大きいと、クエリー結果が古くなりますが、特に書き込みが多いシナリオでは、スループットが大幅に向上します。書き込まれた直後にクエリーで返されるデータが必要ない場合は、更新間隔を調整してクエリーのパフォーマンスを向上させる必要があります。
第5章 クラスター化されたロック
クラスター化されたロックは、Data Grid クラスターのノード間で分散され、共有されるデータ構造です。クラスター化されたロックにより、ノード間で同期されるコードを実行できます。
5.1. ロック API
Data Grid は、埋め込みモードで Data Grid を使用するときに、クラスター上でコードを同時に実行できる ClusteredLock
API を提供します。
API は以下で設定されます。
-
ClusteredLock
は、クラスター化されたロックを実装するメソッドを公開します。 -
ClusteredLockManager
は、クラスター化されたロックの定義、設定、取得、および削除を行うメソッドを公開します。 -
EmbeddedClusteredLockManagerFactory
は、ClusteredLockManager
の実装を初期化します。
所有権
Data Grid は、クラスター内のすべてのノードがロックを使用できるように、NODE
所有権をサポートします。
再入可能性
Data Grid のクラスター化ロックは、再入可能ではないため、クラスター内のすべてのノードがロックを取得できますが、ロックを作成したノードのみがロックを解放することができます。
同じ所有者に対して 2 つの連続したロック呼び出しが送信された場合、最初の呼び出しが使用可能であればロックを取得し、2 番目の呼び出しはブロックされます。
5.2. クラスター化されたロックの使用
アプリケーションに埋め込まれた Data Grid でクラスター化されたロックを使用する方法について説明します。
前提条件
-
infinispan-clustered-lock
依存関係をpom.xml
に追加します。
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-clustered-lock</artifactId> </dependency>
手順
-
キャッシュマネージャーから
ClusteredLockManager
インターフェイスを初期化します。このインターフェイスは、クラスター化されたロックの定義、取得、および削除を行うエントリーポイントです。 - クラスター化されたロックごとに一意の名前を指定します。
-
lock.tryLock(1, TimeUnit.SECONDS)
メソッドでロックを取得します。
// Set up a clustered Cache Manager. GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder(); // Configure the cache mode, in this case it is distributed and synchronous. ConfigurationBuilder builder = new ConfigurationBuilder(); builder.clustering().cacheMode(CacheMode.DIST_SYNC); // Initialize a new default Cache Manager. DefaultCacheManager cm = new DefaultCacheManager(global.build(), builder.build()); // Initialize a Clustered Lock Manager. ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm); // Define a clustered lock named 'lock'. clm1.defineLock("lock"); // Get a lock from each node in the cluster. ClusteredLock lock = clm1.get("lock"); AtomicInteger counter = new AtomicInteger(0); // Acquire the lock as follows. // Each 'lock.tryLock(1, TimeUnit.SECONDS)' method attempts to acquire the lock. // If the lock is not available, the method waits for the timeout period to elapse. When the lock is acquired, other calls to acquire the lock are blocked until the lock is released. CompletableFuture<Boolean> call1 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> { if (r) { System.out.println("lock is acquired by the call 1"); lock.unlock().whenComplete((nil, ex2) -> { System.out.println("lock is released by the call 1"); counter.incrementAndGet(); }); } }); CompletableFuture<Boolean> call2 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> { if (r) { System.out.println("lock is acquired by the call 2"); lock.unlock().whenComplete((nil, ex2) -> { System.out.println("lock is released by the call 2"); counter.incrementAndGet(); }); } }); CompletableFuture<Boolean> call3 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> { if (r) { System.out.println("lock is acquired by the call 3"); lock.unlock().whenComplete((nil, ex2) -> { System.out.println("lock is released by the call 3"); counter.incrementAndGet(); }); } }); CompletableFuture.allOf(call1, call2, call3).whenComplete((r, ex) -> { // Print the value of the counter. System.out.println("Value of the counter is " + counter.get()); // Stop the Cache Manager. cm.stop(); });
5.3. ロックの内部キャッシュの設定
クラスター化されたロックマネージャーには、ロック状態を格納する内部キャッシュが含まれます。内部キャッシュは、宣言的またはプログラムのいずれかに設定できます。
手順
-
クラスター化されたロックの状態を保存するクラスター内のノード数を定義します。デフォルト値は
-1
で、値をすべてのノードに複製します。 キャッシュの信頼性に以下のいずれかの値を指定します。これは、クラスターがパーティションに分割するか、複数のノードが残った場合にクラスター化ロックがどのように動作するかを制御します。
-
AVAILABLE
: 任意のパーティションのノードが、ロックで同時に操作することができます。 -
CONSISTENT
: 大多数のパーティションに属するノードのみが、ロック上で動作できます。これはデフォルト値です。 プログラムによる設定
import org.infinispan.lock.configuration.ClusteredLockManagerConfiguration; import org.infinispan.lock.configuration.ClusteredLockManagerConfigurationBuilder; import org.infinispan.lock.configuration.Reliability; ... GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder(); final ClusteredLockManagerConfiguration config = global.addModule(ClusteredLockManagerConfigurationBuilder.class).numOwner(2).reliability(Reliability.AVAILABLE).create(); DefaultCacheManager cm = new DefaultCacheManager(global.build()); ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm); clm1.defineLock("lock");
宣言型設定
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:12.1 https://infinispan.org/schemas/infinispan-config-12.1.xsd" xmlns="urn:infinispan:config:12.1"> <cache-container default-cache="default"> <transport/> <local-cache name="default"> <locking concurrency-level="100" acquire-timeout="1000"/> </local-cache> <clustered-locks xmlns="urn:infinispan:config:clustered-locks:12.1" num-owners = "3" reliability="AVAILABLE"> <clustered-lock name="lock1" /> <clustered-lock name="lock2" /> </clustered-locks> </cache-container> <!-- Cache configuration goes here. --> </infinispan>
-
第6章 クラスター化カウンター
クラスター化されたカウンター は、Data Grid クラスターのすべてのノードで分散され、共有されるカウンターです。カウンターは異なる整合性レベル (strong および weak) を持つことができます。
strong/weak と一貫性のあるカウンターには個別のインターフェイスがありますが、どちらもその値の更新をサポートし、現在の値を返し、その値が更新されたときにイベントを提供します。このドキュメントでは、ユースケースに最適なものを選択する上で役立つ詳細を以下に示します。
6.1. インストールおよび設定
カウンターの使用を開始するには、Maven の pom.xml
ファイルに依存関係を追加する必要があります。
pom.xml
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-clustered-counter</artifactId> </dependency>
このカウンターは、本書で後述する CounterManager
インターフェイスを介して、Data Grid 設定ファイルまたはオンデマンドを設定できます。EmbeddedCacheManager
の起動時に、起動時に Data Grid 設定ファイルに設定されたカウンターが作成します。これらのカウンターは Eagerly で開始され、すべてのクラスターのノードで利用できます。
configuration.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan> <cache-container ...> <!-- To persist counters, you need to configure the global state. --> <global-state> <!-- Global state configuration goes here. --> </global-state> <!-- Cache configuration goes here. --> <counters xmlns="urn:infinispan:config:counters:12.1" num-owners="3" reliability="CONSISTENT"> <strong-counter name="c1" initial-value="1" storage="PERSISTENT"/> <strong-counter name="c2" initial-value="2" storage="VOLATILE"> <lower-bound value="0"/> </strong-counter> <strong-counter name="c3" initial-value="3" storage="PERSISTENT"> <upper-bound value="5"/> </strong-counter> <strong-counter name="c4" initial-value="4" storage="VOLATILE"> <lower-bound value="0"/> <upper-bound value="10"/> </strong-counter> <weak-counter name="c5" initial-value="5" storage="PERSISTENT" concurrency-level="1"/> </counters> </cache-container> </infinispan>
または、プログラムを使用して GlobalConfigurationBuilder
で以下を行います。
GlobalConfigurationBuilder globalConfigurationBuilder = ...; CounterManagerConfigurationBuilder builder = globalConfigurationBuilder.addModule(CounterManagerConfigurationBuilder.class); builder.numOwner(3).reliability(Reliability.CONSISTENT); builder.addStrongCounter().name("c1").initialValue(1).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c2").initialValue(2).lowerBound(0).storage(Storage.VOLATILE); builder.addStrongCounter().name("c3").initialValue(3).upperBound(5).storage(Storage.PERSISTENT); builder.addStrongCounter().name("c4").initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE); builder.addWeakCounter().name("c5").initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT);
一方、このカウンターは、EmbeddedCacheManager
を初期化した後にいつでも設定することができます。
CounterManager manager = ...; manager.defineCounter("c1", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(1).storage(Storage.PERSISTENT).build()); manager.defineCounter("c2", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(2).lowerBound(0).storage(Storage.VOLATILE).build()); manager.defineCounter("c3", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(3).upperBound(5).storage(Storage.PERSISTENT).build()); manager.defineCounter("c4", CounterConfiguration.builder(CounterType.BOUNDED_STRONG).initialValue(4).lowerBound(0).upperBound(10).storage(Storage.VOLATILE).build()); manager.defineCounter("c2", CounterConfiguration.builder(CounterType.WEAK).initialValue(5).concurrencyLevel(1).storage(Storage.PERSISTENT).build());
CounterConfiguration
は変更できず、再利用できます。
カウンターが正常に設定されていると、defineCounter()
メソッドは true
を返します。そうでない場合は、true
を返します。ただし、設定が無効な場合は、メソッドによって CounterConfigurationException
が発生します。カウンターがすでに定義されているかを調べるには、isDefined()
メソッドを使用します。
CounterManager manager = ... if (!manager.isDefined("someCounter")) { manager.define("someCounter", ...); }
クラスターごとの属性:
-
num-owners
: クラスター全体で保持するカウンターのコピー数を設定します。数値が小さいほど更新操作は高速になりますが、サポートされるサーバークラッシュの数は少なくなります。正の値である必要があり、デフォルト値は2
です。 reliability
: ネットワークパーティションでカウンターの更新動作を設定します。デフォルト値はAVAILABLE
で、有効な値は次のとおりです。-
AVAILABLE
: すべてのパーティションはカウンター値の読み取りと更新が可能です。 -
CONSISTENT
: プライマリーパーティション (ノードの大多数) のみがカウンター値の読み取りと更新が可能です。残りのパーティションは、その値の読み取りのみ可能です。
-
カウンターごとの属性:
-
initial-value
[common]: カウンターの初期値を設定します。デフォルトは0
(ゼロ) です。 storage
[common]: クラスターのシャットダウンおよび再起動時のカウンターの動作を設定します。デフォルト値はVOLATILE
で、有効な値は次のとおりです。-
VOLATILE
: カウンターの値はメモリーでのみ利用できます。この値は、クラスターがシャットダウンすると失われます。 -
PERSISTENT
: カウンターの値はプライベートおよびローカル永続ストアに保存されます。この値は、クラスターがシャットダウンされたときに保持され、再起動後に復元されます。
-
オンデマンドおよび VOLATILE
カウンターは、クラスターのシャットダウン後にその値と設定を失います。再起動後に再度定義する必要があります。
-
lower-bound
[strong]: 強力な一貫性のあるカウンターの下限を設定します。デフォルト値はLong.MIN_VALUE
です。 -
upper-bound
[strong]: 強力な一貫性のあるカウンターの上限を設定します。デフォルト値はLong.MAX_VALUE
です。
lower-bound
も upper-bound
も設定されていない場合は、強力なカウンターは無制限として設定されます。
initial-value
は、lower-bound
以上 upper-bound
以下である必要があります。
-
concurrency-level
[weak]: 同時更新の数を設定します。正の値である必要があり、デフォルト値は16
です。
6.1.1. カウンター名の一覧表示
定義されたすべてのカウンターを一覧表示するには、CounterManager.getCounterNames()
メソッドは、クラスター全体で作成されたすべてのカウンター名のコレクションを返します。
6.2. CounterManager
インターフェイス
CounterManager
インターフェイスは、カウンターを定義、取得、および削除するエントリーポイントです。
埋め込みデプロイメント
CounterManager
は EmbeddedCacheManager
の作成を自動的にリッスンし、EmbeddedCacheManager
ごとのインスタンスの登録を続行します。カウンター状態を保存し、デフォルトのカウンターの設定に必要なキャッシュを開始します。
CounterManager
の取得は、以下の例のように EmbeddedCounterManagerFactory.asCounterManager(EmbeddedCacheManager)
を呼び出すだけです。
// create or obtain your EmbeddedCacheManager EmbeddedCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = EmbeddedCounterManagerFactory.asCounterManager(manager);
サーバーデプロイメント
Hot Rod クライアントの場合、CounterManager
は RemoteCacheManager に登録されており、以下のように取得できます。
// create or obtain your RemoteCacheManager RemoteCacheManager manager = ...; // retrieve the CounterManager CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);
6.2.1. CounterManager を介したカウンターの削除
Strong/WeakCounter
と CounterManager
でカウンターを削除するのに違いがあります。CounterManager.remove(String)
は、クラスターからカウンター値を削除し、ローカルカウンターインスタンスのカウンターに登録されているすべてのリスナーを削除します。さらに、カウンターインスタンスは再利用可能ではなくなり、無効な結果が返される可能性があります。
一方で、Strong/WeakCounter
を削除するとカウンター値のみが削除されます。インスタンスは引き続き再利用でき、リスナーは引き続き動作します。
削除後にアクセスされると、カウンターは再作成されます。
6.3. カウンター
カウンターは、strong (StrongCounter
) または weak(WeakCounter
) になり、いずれも名前で識別されます。各インターフェイスには特定のインターフェイスがありますが、ロジック (つまり各操作により CompletableFuture
が返される) を共有しているため、更新イベントが返され、初期値にリセットできます。
非同期 API を使用しない場合は、sync()
メソッドを介して同期カウンターを返すことができます。API は同じですが、CompletableFuture
の戻り値はありません。
以下のメソッドは、両方のインターフェイスに共通しています。
String getName(); CompletableFuture<Long> getValue(); CompletableFuture<Void> reset(); <T extends CounterListener> Handle<T> addListener(T listener); CounterConfiguration getConfiguration(); CompletableFuture<Void> remove(); SyncStrongCounter sync(); //SyncWeakCounter for WeakCounter
-
getName()
はカウンター名 (identifier) を返します。 -
getValue()
は現在のカウンターの値を返します。 -
reset()
により、カウンターの値を初期値にリセットできます。 -
reset()
はリスナーを登録し、更新イベントを受信します。詳細については、通知およびイベント セクションをご覧ください。 -
getConfiguration()
はカウンターによって使用される設定を返します。 -
remove()
はクラスターからカウンター値を削除します。インスタンスは引き続き使用でき、リスナーが保持されます。 -
sync()
は同期カウンターを作成します。
削除後にアクセスされると、カウンターは再作成されます。
6.3.1. StrongCounter
インターフェイス: 一貫性または境界が明確になります。
strong カウンターは、Data Grid キャッシュに保存されている単一のキーを使用して、必要な一貫性を提供します。すべての更新は、その値を更新するためにキーロックの下で実行されます。一方、読み取りはロックを取得し、現在の値を読み取ります。さらに、このスキームではカウンター値をバインドでき、比較および設定/スワップなどのアトミック操作を提供できます。
StrongCounter
は、getStrongCounter()
メソッドを使用して CounterManager
から取得することができます。たとえば、以下のようになります。
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getStrongCounter("my-counter");
すべての操作は単一のキーに到達するため、StrongCounter
は競合レートが高くなります。
StrongCounter
インターフェイスでは、以下のメソッドを追加します。
default CompletableFuture<Long> incrementAndGet() { return addAndGet(1L); } default CompletableFuture<Long> decrementAndGet() { return addAndGet(-1L); } CompletableFuture<Long> addAndGet(long delta); CompletableFuture<Boolean> compareAndSet(long expect, long update); CompletableFuture<Long> compareAndSwap(long expect, long update);
-
incrementAndGet()
はカウンターを 1 つずつ増分し、新しい値を返します。 -
decrementAndGet()
は、1 つずつカウンターをデクリメントし、新しい値を返します。 -
addAndGet()
は、delta をカウンターの値に追加し、新しい値を返します。 -
compareAndSet()
およびcompareAndSwap()
は、現在の値が想定される場合にカウンターの値を設定します。
CompletableFuture
が完了すると、操作は完了とみなされます。
compare-and-set と compare-and-swap の相違点は、操作に成功した場合に、compare-and-set は true を返しますが、compare-and-swap は前の値をか返すことです。戻り値が期待値と同じ場合は、compare-and-swap が正常になります。
6.3.1.1. バインドされた StrongCounter
バインドされている場合、上記の更新メソッドはすべて、下限または上限に達すると CounterOutOfBoundsException
を出力します。例外には、どちら側にバインドが到達したかを確認するための次のメソッドがあります。
public boolean isUpperBoundReached(); public boolean isLowerBoundReached();
6.3.1.2. ユースケース
強力なカウンターは、次の使用例に適しています。
- 各更新後にカウンターの値が必要な場合 (例: クラスター単位の ID ジェネレーターまたはシーケンス)
- バインドされたカウンターが必要な場合は (例: レートリミッター)
6.3.1.3. 使用例
StrongCounter counter = counterManager.getStrongCounter("unbounded_counter"); // incrementing the counter System.out.println("new value is " + counter.incrementAndGet().get()); // decrement the counter's value by 100 using the functional API counter.addAndGet(-100).thenApply(v -> { System.out.println("new value is " + v); return null; }).get(); // alternative, you can do some work while the counter is updated CompletableFuture<Long> f = counter.addAndGet(10); // ... do some work ... System.out.println("new value is " + f.get()); // and then, check the current value System.out.println("current value is " + counter.getValue().get()); // finally, reset to initial value counter.reset().get(); System.out.println("current value is " + counter.getValue().get()); // or set to a new value if zero System.out.println("compare and set succeeded? " + counter.compareAndSet(0, 1));
以下に、バインドされたカウンターを使用する別の例を示します。
StrongCounter counter = counterManager.getStrongCounter("bounded_counter"); // incrementing the counter try { System.out.println("new value is " + counter.addAndGet(100).get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } } // now using the functional API counter.addAndGet(-100).handle((v, throwable) -> { if (throwable != null) { Throwable cause = throwable.getCause(); if (cause instanceof CounterOutOfBoundsException) { if (((CounterOutOfBoundsException) cause).isUpperBoundReached()) { System.out.println("ops, upper bound reached."); } else if (((CounterOutOfBoundsException) cause).isLowerBoundReached()) { System.out.println("ops, lower bound reached."); } } return null; } System.out.println("new value is " + v); return null; }).get();
Compare-and-set と Compare-and-swap の比較例:
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue, newValue; do { oldValue = counter.getValue().get(); newValue = someLogic(oldValue); } while (!counter.compareAndSet(oldValue, newValue).get());
compare-and-swap では、呼び出しカウンターの呼び出し (counter.getValue()
) が 1 つ保存されます。
StrongCounter counter = counterManager.getStrongCounter("my-counter"); long oldValue = counter.getValue().get(); long currentValue, newValue; do { currentValue = oldValue; newValue = someLogic(oldValue); } while ((oldValue = counter.compareAndSwap(oldValue, newValue).get()) != currentValue);
6.3.2. WeakCounter
インターフェイス: 速度が必要な場合
WeakCounter
は、カウンターの値を Data Grid キャッシュの複数のキーに保存します作成されたキーの数は concurrency-level
属性によって設定されます。各キーはカウンターの値の一部の状態を保存し、同時に更新できます。StrongCounter
よりも優れた点は、キャッシュの競合率が低いことです。一方、値の読み取りはよりコストが高く、バインドは許可されません。
リセット操作は注意して行う必要があります。これは アトミックではなく、中間値を生成します。これらの値は、読み取り操作および登録されたリスナーによって確認できます。
WeakCounter
は、getWeakCounter()
メソッドを使用して CounterManager
から取得できます。たとえば、以下のようになります。
CounterManager counterManager = ... StrongCounter aCounter = counterManager.getWeakCounter("my-counter);
6.3.2.1. weak カウンターインターフェイス
WeakCounter
は、以下のメソッドを追加します。
default CompletableFuture<Void> increment() { return add(1L); } default CompletableFuture<Void> decrement() { return add(-1L); } CompletableFuture<Void> add(long delta);
これらは `StrongCounter のメソッドと似ていますが、新しい値は返されません。
6.3.2.2. ユースケース
weak カウンターは、更新操作の結果が必要ない場合やカウンターの値があまり必要でないユースケースに最適です。統計の収集は、このようなユースケースの良い例になります。
6.3.2.3. 例
以下では、弱いカウンターの使用例を示します。
WeakCounter counter = counterManager.getWeakCounter("my_counter"); // increment the counter and check its result counter.increment().get(); System.out.println("current value is " + counter.getValue()); CompletableFuture<Void> f = counter.add(-100); //do some work f.get(); //wait until finished System.out.println("current value is " + counter.getValue().get()); //using the functional API counter.reset().whenComplete((aVoid, throwable) -> System.out.println("Reset done " + (throwable == null ? "successfully" : "unsuccessfully"))).get(); System.out.println("current value is " + counter.getValue().get());
6.4. 通知およびイベント
strong カウンターと weak カウンターの両方が、更新イベントを受信するためにリスナーをサポートします。リスナーは CounterListener
を実装する必要があり、これを以下の方法で登録できます。
<T extends CounterListener> Handle<T> addListener(T listener);
CounterListener
には以下のインターフェイスがあります。
public interface CounterListener { void onUpdate(CounterEvent entry); }
返される Handle
オブジェクトには、CounterListener
が必要なくなったときに削除するという主な目的があります。また、処理している CounterListener
インスタンスにアクセスできます。これには、以下のインターフェイスがあります。
public interface Handle<T extends CounterListener> { T getCounterListener(); void remove(); }
最後に、CounterEvent
には、以前と現在の値と状態があります。これには、以下のインターフェイスがあります。
public interface CounterEvent { long getOldValue(); State getOldState(); long getNewValue(); State getNewState(); }
状態は、非有界である strong カウンターおよび weak カウンターでは常に State.VALID
になります。State.LOWER_BOUND_REACHED
および State.UPPER_BOUND_REACHED
は有界である strong カウンターのみに有効です。
weak カウンター reset()
操作は、中間値で複数の通知をトリガーします。
第7章 CDI 拡張機能の使用
Data Grid は、CDI (Contexts and Dependency Injection) プログラミングモデルと統合し、以下を可能にするエクステンションを提供します。
- CDI Bean および Java EE コンポーネントにキャッシュを設定し、インジェクトします。
- キャッシュマネージャーを設定します。
- キャッシュおよびキャッシュマネージャーレベルのイベントを受信します。
- JCache アノテーションを使用してデータストレージおよび取得を制御します。
7.1. CDI 依存関係
以下の依存関係のいずれかで pom.xml
を更新し、プロジェクトに Data Grid CDI エクステンションを追加します。
埋め込み (Library) モード
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-cdi-embedded</artifactId> </dependency>
サーバーモード
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-cdi-remote</artifactId> </dependency>
7.2. 組み込みキャッシュのインジェクト
組み込みキャッシュをインジェクトするために CDI Bean を設定します。
手順
キャッシュ修飾子アノテーションを作成します。
... import javax.inject.Qualifier; @Qualifier @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface GreetingCache { 1 }
- 1
@GreetingCache
修飾子を作成します。
キャッシュ設定を定義するプロデューサーメソッドを追加します。
... import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.cdi.ConfigureCache; import javax.enterprise.inject.Produces; public class Config { @ConfigureCache("mygreetingcache") 1 @GreetingCache 2 @Produces public Configuration greetingCacheConfiguration() { return new ConfigurationBuilder() .memory() .size(1000) .build(); } }
必要に応じて、クラスター化されたキャッシュマネージャーを作成するプロデューサーメソッドを追加します。
... package org.infinispan.configuration.global.GlobalConfigurationBuilder; public class Config { @GreetingCache 1 @Produces @ApplicationScoped 2 public EmbeddedCacheManager defaultClusteredCacheManager() { 3 return new DefaultCacheManager( new GlobalConfigurationBuilder().transport().defaultTransport().build(); } }
注記キャッシュマネージャーは、ヘビーウェイトオブジェクトです。アプリケーションで複数のキャッシュマネージャーを実行すると、パフォーマンスが低下する可能性があります。複数のキャッシュを挿入する場合は、各キャッシュの修飾子をキャッシュマネージャープロデューサーメソッドに追加するか、修飾子を追加しないでください。
@GreetingCache
修飾子をキャッシュインジェクションポイントに追加します。... import javax.inject.Inject; public class GreetingService { @Inject @GreetingCache private Cache<String, String> cache; public String greet(String user) { String cachedValue = cache.get(user); if (cachedValue == null) { cachedValue = "Hello " + user; cache.put(user, cachedValue); } return cachedValue; } }
7.3. リモートキャッシュの注入
リモートキャッシュを注入するために CDI Bean を設定します。
手順
キャッシュ修飾子アノテーションを作成します。
@Remote("mygreetingcache") 1 @Qualifier @Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RemoteGreetingCache { 2 }
キャッシュインジェクションポイントに
@RemoteGreetingCache
修飾子を追加します。public class GreetingService { @Inject @RemoteGreetingCache private RemoteCache<String, String> cache; public String greet(String user) { String cachedValue = cache.get(user); if (cachedValue == null) { cachedValue = "Hello " + user; cache.put(user, cachedValue); } return cachedValue; } }
リモートキャッシュをインジェクトするためのヒント
修飾子を使用せずにリモートキャッシュをインジェクトできます。
... @Inject @Remote("greetingCache") private RemoteCache<String, String> cache;
複数の Data Grid クラスターがある場合は、クラスターごとに個別のリモートキャッシュマネージャープロデューサーを作成できます。
... import javax.enterprise.context.ApplicationScoped; public class Config { @RemoteGreetingCache @Produces @ApplicationScoped 1 public ConfigurationBuilder builder = new ConfigurationBuilder(); 2 builder.addServer().host("localhost").port(11222); return new RemoteCacheManager(builder.build()); } }
7.4. JCache キャッシングアノテーション
JCache アーティファクトがクラスパスにある場合、以下の JCache キャッシングアノテーションを CDI 管理 Bean で使用できます。
@CacheResult
- メソッド呼び出しの結果をキャッシュします。
@CachePut
- メソッドパラメーターをキャッシュします。
@CacheRemoveEntry
- キャッシュからエントリーを削除します。
@CacheRemoveAll
- キャッシュからすべてのエントリーを削除します。
Target type: これらの JCache キャッシングアノテーションはメソッドでのみ使用できます。
JCache キャッシュアノテーションを使用するには、アプリケーションの beans.xml
ファイルでインターセプターを宣言します。
管理対象環境 (アプリケーションサーバー)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" version="1.2" bean-discovery-mode="annotated"> <interceptors> <class>org.infinispan.jcache.annotation.InjectedCacheResultInterceptor</class> <class>org.infinispan.jcache.annotation.InjectedCachePutInterceptor</class> <class>org.infinispan.jcache.annotation.InjectedCacheRemoveEntryInterceptor</class> <class>org.infinispan.jcache.annotation.InjectedCacheRemoveAllInterceptor</class> </interceptors> </beans>
管理対象外環境 (スタンドアロン)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" version="1.2" bean-discovery-mode="annotated"> <interceptors> <class>org.infinispan.jcache.annotation.CacheResultInterceptor</class> <class>org.infinispan.jcache.annotation.CachePutInterceptor</class> <class>org.infinispan.jcache.annotation.CacheRemoveEntryInterceptor</class> <class>org.infinispan.jcache.annotation.CacheRemoveAllInterceptor</class> </interceptors> </beans>
JCache キャッシングアノテーションの例
以下の例は、@CacheResult
アノテーションが GreetingService.greet()
メソッドの結果をキャッシュする方法を示しています。
import javax.cache.interceptor.CacheResult; public class GreetingService { @CacheResult public String greet(String user) { return "Hello" + user; } }
JCache アノテーションを使用すると、デフォルトのキャッシュは、アノテーションが付けられたメソッドの完全修飾名をパラメータータイプで使用します。例を以下に示します。org.infinispan.example.GreetingService.greet(java.lang.String)
デフォルト以外のキャッシュを使用するには、以下の例のように、cacheName
属性を使用してキャッシュ名を指定します。
@CacheResult(cacheName = "greeting-cache")
7.5. キャッシュおよびキャッシュマネージャーイベントの受信
CDI イベントを使用して、キャッシュおよびキャッシュマネージャーレベルのイベントを受信します。
-
以下の例のように
@Observes
アノテーションを使用します。
import javax.enterprise.event.Observes; import org.infinispan.notifications.cachemanagerlistener.event.CacheStartedEvent; import org.infinispan.notifications.cachelistener.event.*; public class GreetingService { // Cache level events private void entryRemovedFromCache(@Observes CacheEntryCreatedEvent event) { ... } // Cache manager level events private void cacheStarted(@Observes CacheStartedEvent event) { ... } }
第8章 ロックおよび同時実行
Data Grid は、マルチバージョン同時実行制御 (MVCC) を利用します (MVCC)。これは、リレーショナルデータベースや他のデータストアでよく使用される同時実行スキームです。MVCC には、粗粒度の Java 同期や、共有データにアクセスするための JDK ロックに比べて、次のような多くの利点があります。
- 同時リーダーとライターの許可
- リーダーとライターが互いにブロックしない
- 書き込みスキューを検出して処理できる
- 内部ロックのストライピングが可能
8.1. 実装の詳細のロック
Data Grid の MVCC 実装では、ロックと同期が最小限に抑えられており、可能な限り compare-and-swap などのロックフリー技術やロックフリーのデータ構造などに重点を置いています。これにより、マルチ CPU 環境とマルチコア環境の最適化に役立ちます。
特に、Data Grid の MVCC 実装はリーダーに対して高度に最適化されています。リーダースレッドは、エントリーの明示的なロックを取得せず、代わりに問題のエントリーを直接読み込みます。
一方、ライターは、書き込みロックを取得する必要があります。これにより、エントリーごとに 1 つの同時書き込みのみが保証されるため、同時ライターはキューイングしてエントリーを変更することになります。
同時読み取りを可能にするため、ライターはエントリーを MVCCEntry
でラップして、変更する予定のエントリーのコピーを作成します。このコピーは、同時リーダーが部分的に変更された状態を認識できないようにします。書き込みが完了したら、MVCCEntry.commit()
はデータコンテナーへの変更をフラッシュし、後続のリーダーに変更内容が反映されます。
8.1.1. クラスター化されたキャッシュおよびロック
Data Grid クラスターでは、プライマリー所有者ノードがキーをロックします。
非トランザクションキャッシュの場合、Data Grid は書き込み操作をキーのプライマリー所有者に転送して、ロックを試行できるようにします。次に、Data Grid は、他の所有者に書き込み操作を転送するか、キーをロックできない場合は例外を出力します。
操作が条件付きで、プライマリー所有者で失敗した場合、Data Grid はこれを他の所有者には転送しません。
トランザクションキャッシュの場合、プライマリーの所有者は楽観的および悲観的ロックモードでキーをロックできます。Data Grid は、トランザクション間の同時読み取りを制御する異なる分離レベルもサポートします。
8.1.2. LockManager
LockManager
は、書き込み用にエントリーをロックするコンポーネントです。LockManager
は、LockContainer
を使用して、ロックを検索、保持、作成します。LockContainers
には、ロックストライピングをサポートするものと、エントリーごとに 1 つのロックをサポートするものの 2 つの大まかな特徴があります。
8.1.3. ロックストライピング
ロックストライピングでは、固定サイズの共有ロックコレクションをキャッシュ全体に使用する必要があり、ロックはエントリーのキーのハッシュコードに基づいてエントリーに割り当てられます。JDK の ConcurrentHashMap
がロックを割り当てる方法と同様に、これにより、関連性のない可能性のあるエントリーが同じロックによってブロックされる代わりに、拡張性の高い固定オーバーヘッドのロックメカニズムが可能になります。
別の方法は、ロックストライピングを無効にすることです。これは、エントリーごとに 新しい ロックが作成されることを意味します。このアプローチでは、スループットが高くなる 可能性 がありますが、追加のメモリー使用量やガベージコレクションのチャーンなどのコストがかかります。
異なるキーのロックが同じロックストライプになってしまうとデッドロックが発生する可能性があるため、ロックストライピングはデフォルトで無効になっています。
ロックストライピングで使用される共有ロックコレクションのサイズは、<locking />
設定要素の concurrencyLevel
属性を使用して調整できます。
設定例:
<locking striping="false|true"/>
または、以下を実行します。
new ConfigurationBuilder().locking().useLockStriping(false|true);
8.1.4. 同時実行レベル
この同時実行レベルは、ストライプロックコンテナーのサイズを決定する他に、DataContainer
の内部など、関連する JDK ConcurrentHashMap
ベースのコレクションを調整するためにも使用されます。このパラメーターは、Data Grid でもまったく同じ方法で使用されているため、同時実行レベルの詳細については、JDK ConcurrentHashMap
Javadocs を参照してください。
設定例:
<locking concurrency-level="32"/>
または、以下を実行します。
new ConfigurationBuilder().locking().concurrencyLevel(32);
8.1.5. ロックタイムアウト
ロックタイムアウトは、競合するロックを待つ時間 (ミリ秒単位) を指定します。
設定例:
<locking acquire-timeout="10000"/>
または、以下を実行します。
new ConfigurationBuilder().locking().lockAcquisitionTimeout(10000); //alternatively new ConfigurationBuilder().locking().lockAcquisitionTimeout(10, TimeUnit.SECONDS);
8.1.6. 一貫性
(すべての所有者がロックされているのとは対照的に) 単一の所有者がロックされるという事実により、次の一貫性の保証が失われることはありません。キー K
がノード {A, B}
に対してハッシュ化され、トランザクション TX1
が、たとえば、A
上の K
のロックを取得したとします。別のトランザクション TX2
が B
(またはその他のノード) 上で開始され、TX2
が K
のロックを試みる場合、ロックがすでに TX1
によって保持されているため、タイムアウトでロックに失敗します。理由は、キー K
のロックがトランザクションの発生場所に関係なく、常に、確定的に、クラスターの同じノードで取得されるからです。
8.2. データのバージョン管理
Data Grid は、simple と external の 2 つの形式のデータバージョン管理をサポートします。simple バージョン管理は、書き込みスキューチェックのトランザクションキャッシュで使用されます。
external バージョン管理は、Data Grid を Hibernate で使用する場合など、Data Grid 内のデータバージョン管理の外部ソースをカプセル化するために使用され、そのデータバージョン情報をデータベースから直接取得します。
このスキームでは、バージョンに渡すメカニズムが必要になり、オーバーロードされたバージョン put()
および putForExternalRead()
が、AdvancedCache
で提供され、外部データバージョンを取り込みます。その後、これは InvocationContext
に保管され、コミット時にエントリーに適用されます。
external バージョン管理の場合、書き込みスキューチェックは実行できず、実行されません。
第9章 トランザクション
Data Grid は、JTA 準拠のトランザクションを使用し、参加するように設定できます。
または、トランザクションのサポートが無効になっている場合は、JDBC 呼び出しで自動コミットを使用する場合と同等になります。ここでは、すべての変更後に変更がレプリケートされる可能性があります (レプリケーションが有効な場合)。
すべてのキャッシュ操作で Data Grid は以下を行います。
- スレッドに関連する現在の トランザクション を取得します。
- トランザクションのコミットまたはロールバック時に通知されるように、XAResource をトランザクションマネージャーに登録します (登録されていない場合)。
これを実行するには、キャッシュに環境の TransactionManager への参照を提供する必要があります。これは通常、TransactionManagerLookup インターフェイスの実装のクラス名を使用してキャッシュを設定することで行います。キャッシュが起動すると、このクラスのインスタンスを作成し、TransactionManager
への参照を返す getTransactionManager()
メソッドを呼び出します。
Data Grid には複数のトランザクションマネージャールックアップクラスが同梱されます。
トランザクションマネージャールックアップの実装
- EmbeddedTransactionManagerLookup: これは、他の実装が利用できない場合に、埋め込みモードのみに使用する必要がある基本的なトランザクションマネージャーを提供します。この実装は、同時トランザクションおよびリカバリーでは、重大な制限があります。
-
JBossStandaloneJTAManagerLookup: スタンドアロン環境、または JBoss AS 7 以前、および WildFly 8、9、10 で Data Grid を実行している場合、トランザクションマネージャーのデフォルトとしてこれを選択します。このトランザクションは、
EmbeddedTransactionManager
の不足をすべて解消する JBoss Transactions をベースとした本格的なトランザクションマネージャーです。 - WildflyTransactionManagerLookup: WildFly 11 以降で Data Grid を実行している場合は、トランザクションマネージャーのデフォルトとしてこれを選択します。
-
GenericTransactionManagerLookup: これは、最も一般的な Java EE アプリケーションサーバーでトランザクションマネージャーを見つけるルックアップクラスです。トランザクションマネージャーが見つからない場合は、
EmbeddedTransactionManager
がデフォルトの設定になります。
警告: DummyTransactionManagerLookup
は 9.0 で非推奨となり、今後削除される予定です。代わりに EmbeddedTransactionManagerLookup
を使用してください。
初期化すると、TransactionManager
は Cache
自体から取得することもできます。
//the cache must have a transactionManagerLookupClass defined Cache cache = cacheManager.getCache(); //equivalent with calling TransactionManagerLookup.getTransactionManager(); TransactionManager tm = cache.getAdvancedCache().getTransactionManager();
9.1. トランザクションの設定
トランザクションはキャッシュレベルで設定されます。以下はトランザクションの動作に影響する設定と、各設定属性の簡単な説明になります。
<locking isolation="READ_COMMITTED"/> <transaction locking="OPTIMISTIC" auto-commit="true" complete-timeout="60000" mode="NONE" notifications="true" reaper-interval="30000" recovery-cache="__recoveryInfoCacheName__" stop-timeout="30000" transaction-manager-lookup="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"/>
プログラムを使用する場合
ConfigurationBuilder builder = new ConfigurationBuilder(); builder.locking() .isolationLevel(IsolationLevel.READ_COMMITTED); builder.transaction() .lockingMode(LockingMode.OPTIMISTIC) .autoCommit(true) .completedTxTimeout(60000) .transactionMode(TransactionMode.NON_TRANSACTIONAL) .useSynchronization(false) .notifications(true) .reaperWakeUpInterval(30000) .cacheStopTimeout(30000) .transactionManagerLookup(new GenericTransactionManagerLookup()) .recovery() .enabled(false) .recoveryInfoCacheName("__recoveryInfoCacheName__");
-
isolation
- 分離レベルを設定します。詳細は、分離レベル を参照してください。デフォルトはREPEATABLE_READ
です。 -
locking
- キャッシュが楽観的または悲観的ロックを使用するかどうかを設定します。詳細は、トランザクションのロック を参照してください。デフォルトはOPTIMISTIC
です。 -
auto-commit
: 有効にすると、ユーザーは 1 回の操作でトランザクションを手動で開始する必要はありません。トランザクションは自動的に起動およびコミットされます。デフォルトはtrue
です。 -
complete-timeout
- 完了したトランザクションに関する情報を保持する期間 (ミリ秒単位)。デフォルトは60000
です。 mode
: キャッシュがトランザクションかどうかを設定します。デフォルトはNONE
です。利用可能なオプションは以下のとおりです。-
NONE
- 非トランザクションキャッシュ -
FULL_XA
- リカバリーが有効になっている XA トランザクションキャッシュリカバリーの詳細は、トランザクションリカバリー を参照してください。 -
NON_DURABLE_XA
- リカバリーが無効になっている XA トランザクションキャッシュ。 -
NON_XA
- XA の代わりに 同期化 を介して統合されたトランザクションキャッシュ。詳細は、同期の登録 のセクションを参照してください。 -
BATCH
- バッチを使用して操作をグループ化するトランザクションキャッシュ。詳細は バッチ処理 のセクションを参照してください。
-
-
notifications
- キャッシュリスナーのトランザクションイベントを有効/無効にします。デフォルトはtrue
です。 -
reaper-interval
- トランザクション完了情報をクリーンアップするスレッドが開始する間隔 (ミリ秒単位)。デフォルトは30000
です。 -
recovery-cache
- リカバリー情報を保存するキャッシュ名を設定します。リカバリーの詳細は、トランザクションリカバリー を参照してください。デフォルトはrecoveryInfoCacheName
です。 -
stop-timeout
- キャッシュの停止時に進行中のトランザクションを待機する時間 (ミリ秒単位)。デフォルトは30000
です。 -
transaction-manager-lookup
-javax.transaction.TransactionManager
への参照を検索するクラスの完全修飾クラス名を設定します。デフォルトはorg.infinispan.transaction.lookup.GenericTransactionManagerLookup
です。
2 フェーズコミット (2PC) が Data Grid に実装される方法、およびロックが取得される方法についての詳細は、以下のセクションを参照してください。設定の詳細については、Configuration reference を参照してください。
9.2. 分離レベル
Data Grid は、READ_COMMITTED および REPEATABLE_READ の 2 つの分離レベルを提供します。
これらの分離レベルは、リーダーが同時書き込みを確認するタイミングを決定し、MVCCEntry
の異なるサブクラスを使用して内部的に実装されます。MVCCEntry では、状態がデータコンテナーにコミットされる方法が異なります。
以下は、Data Grid のコンテキストの READ_COMMITTED
および REPEATABLE_READ
の違いを理解する上で役立つ詳細な例です。READ_COMMITTED
の場合、同じキーで連続して 2 つの読み取り呼び出しを行うと、キーが別のトランザクションによって更新され、2 つ目の読み取りによって新しい更新値が返されることがあります。
Thread1: tx1.begin() Thread1: cache.get(k) // returns v Thread2: tx2.begin() Thread2: cache.get(k) // returns v Thread2: cache.put(k, v2) Thread2: tx2.commit() Thread1: cache.get(k) // returns v2! Thread1: tx1.commit()
REPEATABLE_READ
では、最終 get は引き続き v
を返します。そのため、トランザクション内で同じキーを複数回取得する場合は、REPEATABLE_READ
を使用する必要があります。
ただし、読み取りロックが REPEATABLE_READ
に対しても取得されないため、この現象が発生する可能性があります。
cache.get("A") // returns 1 cache.get("B") // returns 1 Thread1: tx1.begin() Thread1: cache.put("A", 2) Thread1: cache.put("B", 2) Thread2: tx2.begin() Thread2: cache.get("A") // returns 1 Thread1: tx1.commit() Thread2: cache.get("B") // returns 2 Thread2: tx2.commit()
9.3. トランザクションのロック
9.3.1. 悲観的なトランザクションキャッシュ
ロック取得の観点では、悲観的トランザクションはキーの書き込み時にキーのロックを取得します。
- ロック要求がプライマリー所有者に送信されます (明示的なロック要求または操作のいずれか)。
プライマリーの所有者はロックの取得を試みます。
- 成功した場合は、正の応答が返されます。
- そうでない場合は、負の応答が送信され、トランザクションはロールバックされます。
たとえば、以下のようになります。
transactionManager.begin(); cache.put(k1,v1); //k1 is locked. cache.remove(k2); //k2 is locked when this returns transactionManager.commit();
cache.put(k1,v1)
が返されると、k1
はロックされ、クラスター内のどこかで実行中の他のトランザクションは、これに書き込むことができません。k1
の読み取りは引き続き可能です。トランザクションの完了時に k1
のロックが解放されます (コミットまたはロールバック)。
条件付き操作の場合、検証はオリジネーターで実行されます。
9.3.2. 楽観的トランザクションキャッシュ
楽観的トランザクションロックはトランザクションの準備時に取得され、トランザクションのコミット (またはロールバック) まで保持されます。これは、書き込みでローカルロックを取得し、準備中にクラスターのロックが取得される 5.0 デフォルトロックモデルとは異なります。
- 準備はすべての所有者に送信されます。
プライマリーの所有者は、必要なロックの取得を試みます。
- ロックに成功すると、書き込みのスキューチェックが実行されます。
- 書き込みスキューチェックが成功した場合 (または無効化された場合) は、正の応答を送信します。
- それ以外の場合は、負の応答が送信され、トランザクションはロールバックされます。
たとえば、以下のようになります。
transactionManager.begin(); cache.put(k1,v1); cache.remove(k2); transactionManager.commit(); //at prepare time, K1 and K2 is locked until committed/rolled back.
条件付きコマンドの場合、検証は引き続きオリジネーターで実行されます。
9.3.3. 悲観的または楽観的トランザクションのどちらが必要か
ユースケースの観点からは、同時に実行されている複数のトランザクション間で多くの競合が ない 場合は、楽観的トランザクションを使用する必要があります。これは、読み取り時と、コミット時 (書き込みスキューチェックが有効) の間でデータが変更された場合に、楽観的トランザクションがロールバックするためです。
一方、キーでの競合が多く、トランザクションのロールバックがあまり望ましくない場合は、悲観的トランザクションの方が適している可能性があります。悲観的トランザクションは、その性質上、よりコストがかかります。各書き込み操作ではロックの取得に RPC が関係する可能性があります。
9.4. スキューの書き込み
書き込みスキューは、2 つのトランザクションが独立して同時に同じキーの読み取りと書き込みを行うときに発生します。書き込みスキューの結果、両方のトランザクションは同じキーに対して更新を正常にコミットしますが、値は異なります。
Data Grid は、書き込みスキューチェックを自動的に実行し、楽観的トランザクションで REPEATABLE_READ
分離レベルのデータの一貫性を確保します。これにより、Data Grid はトランザクションの 1 つを検出し、ロールバックできます。
LOCAL
モードで動作する場合、書き込みスキューの確認は Java オブジェクト参照に依存して違いを比較します。これにより、書き込みスキューをチェックするための信頼性の高い技術が提供されます。
9.4.1. 悲観的トランザクションでのキーへの書き込みロックの強制
悲観的トランザクションでの書き込みスキューを回避するには、Flag.FORCE_WRITE_LOCK
で読み取り時にキーをロックします。
-
トランザクション以外のキャッシュでは、
Flag.FORCE_WRITE_LOCK
は動作しません。get()
呼び出しは、キーの値を読み取りますが、ロックをリモートで取得しません。 -
Flag.FORCE_WRITE_LOCK
は、同じトランザクションでエンティティーが後で更新されるトランザクションと併用する必要があります。
Flag.FORCE_WRITE_LOCK
の例については、以下のコードスニペットを比較してください。
// begin the transaction if (!cache.getAdvancedCache().lock(key)) { // abort the transaction because the key was not locked } else { cache.get(key); cache.put(key, value); // commit the transaction }
// begin the transaction try { // throws an exception if the key is not locked. cache.getAdvancedCache().withFlags(Flag.FORCE_WRITE_LOCK).get(key); cache.put(key, value); } catch (CacheException e) { // mark the transaction rollback-only } // commit or rollback the transaction
9.5. 例外への対処
CacheException (またはそのサブクラス) が JTA トランザクションの範囲内のキャッシュメソッドによって出力される場合、トランザクションは自動的にロールバックに対してマークされます。
9.6. 同期の登録
デフォルトでは、Data Grid は XAResource を介して、分散トランザクションの最初のクラス参加者として登録します。トランザクションの参加者として Data Grid が必要ではなく、ライフサイクル (準備、完了) によってのみ通知される状況があります (例: Data Grid が Hibernate で 2 次レベルキャッシュとして使用される場合など)。
Data Grid は、同期 を介したトランザクションのエンリストを許可します。これを有効にするには、NON_XA
トランザクションモードを使用します。
Synchronization
には、TransactionManager
が 1PC で 2PC を最適化できるという利点があります。この場合、他の 1 つのリソースのみがそのトランザクションにエンリストされます (last resource commit optimization)。つまり、Hibernate 2 次キャッシュ: Data Grid がコミット時よりも XAResource
として TransactionManager
に登録する場合、TransactionManager
は 2 つの XAResource
(キャッシュとデータベース) を認識し、この最適化を行いません。2 つのリソース間で調整する必要があるため、tx ログをディスクに書き込む必要があります。一方、Data Grid を Synchronization
として登録すると、TransactionManager
はディスクへのログの書き込みを省略します (パフォーマンスが向上)。
9.7. バッチ処理
バッチ処理は、トランザクションの原子性といくつかの特性を許可しますが、本格的な JTA または XA 機能は許可しません。多くの場合、バッチ処理は本格的なトランザクションよりもはるかに軽量で安価です。
一般的には、トランザクションの参加者のみが Data Grid クラスターである場合に、バッチ処理 API を使用する必要があります。反対に、トランザクションに複数のシステムが必要な場合に、(TransactionManager
に関連する)JTA トランザクションを使用する必要があります。たとえば、トランザクションの Hello world! を考慮すると、ある銀行口座から別の銀行口座にお金を転送します。両方の口座が Data Grid 内に保存されている場合は、バッチ処理を使用できます。ある口座がデータベースにあり、もう 1 つの口座が Data Grid の場合は、分散トランザクションが必要になります。
バッチ処理を使用するためにトランザクションマネージャーを定義する必要はありません。
9.7.1. API
バッチ処理を使用するようにキャッシュを設定したら、Cache
で startBatch()
と endBatch()
を呼び出して使用します。例:
Cache cache = cacheManager.getCache(); // not using a batch cache.put("key", "value"); // will replicate immediately // using a batch cache.startBatch(); cache.put("k1", "value"); cache.put("k2", "value"); cache.put("k2", "value"); cache.endBatch(true); // This will now replicate the modifications since the batch was started. // a new batch cache.startBatch(); cache.put("k1", "value"); cache.put("k2", "value"); cache.put("k3", "value"); cache.endBatch(false); // This will "discard" changes made in the batch
9.7.2. バッチ処理と JTA
裏では、バッチ機能が JTA トランザクションを開始し、そのスコープ内のすべての呼び出しがそれに関連付けられます。これには、内部 TransactionManager
実装が非常に簡単な (例: リカバリーなし) を使用します。バッチ処理では、以下を取得します。
- 呼び出し中に取得したロックはバッチが完了するまで保持されます。
- 変更はすべて、バッチ完了プロセスの一部として、クラスター内でバッチ内に複製されます。バッチの各更新のレプリケーションチャット数を減らします。
- 同期のレプリケーションまたは無効化が使用された場合は、レプリケーション/無効化の失敗により、バッチがロールバックされます。
- すべてのトランザクション関連の設定は、バッチ処理にも適用されます。
9.8. トランザクションリカバリー
リカバリーは XA トランザクションの機能であり、リソースの不測の事態、場合によってはトランザクションマネージャーの障害を対処し、それに応じてそのような状況から回復します。
9.8.1. リカバリーを使用するタイミング
外部データベースに保存されたアカウントから Data Grid に保管されたアカウントに転送される分散トランザクションについて考えてみましょう。TransactionManager.commit()
が呼び出されると、両方のリソースが正常に完了します (第 1 フェーズ)。コミット (第 2) フェーズでは、データベースは、トランザクションマネージャーからコミットリクエストを受け取る前に、Data Grid の変更を問題なく適用します。この時点では、システムが一貫性のない状態です。お金は外部データベースのア口座から取得されますが、まだ Data Grid には表示されません (ロックは 2 フェーズコミットプロトコルの 2 番目のフェーズでのみリリースされます)。リカバリーはこの状況に対応することで、データベースと Data Grid の両方のデータが一貫した状態で終了します。
9.8.2. 仕組み
リカバリーはトランザクションマネージャーによって調整されます。トランザクションマネージャーは Data Grid と連携して、手動による介入が必要な未確定のトランザクションのリストを決定し、システム管理者に (電子メール、ログアラートなどを介して) 通知します。このプロセスはトランザクションマネージャーに固有のものですが、通常トランザクションマネージャーで設定が必要になります。
未確定のトランザクション ID を把握すると、システム管理者は Data Grid クラスターに接続し、トランザクションのコミットを再生したり、ロールバックを強制できるようになりました。Data Grid は、この JMX ツールを提供します。これは、トランザクションのリカバリーおよび調整セクション で広範囲に説明されています。
9.8.3. リカバリーの設定
Data Grid では、リカバリーはデフォルトでは有効になっていません。無効にすると、TransactionManager
は Data Grid と動作しないため、インダウト状態のトランザクションを決定できません。トランザクションの設定 セクションでは、その設定を有効にする方法を示しています。
注記: recovery-cache
属性は必須ではなく、キャッシュごとに設定されます。
リカバリーが機能するには、完全な XA トランザクションが必要であるため、mode
を FULL_XA
に設定する必要があります。
9.8.3.1. JMX サポートの有効化
リカバリー JMX サポートの管理に JMX を使用できるようにするには、明示的に有効にする必要があります。
9.8.4. リカバリーキャッシュ
未確定のトランザクションを追跡し、それらに応答できるようにするために、Data Grid は将来の使用のためにすべてのトランザクション状態をキャッシュします。この状態は、未確定のトランザクションに対してのみ保持され、コミット/ロールバックフェーズが完了した後、正常に完了したトランザクションに対しては削除されます。
この未確定のトランザクションデータはローカルキャッシュ内に保持されます。これにより、データが大きくなりすぎた場合に、キャッシュローダーを介してこの情報をディスクにスワップするように設定できます。このキャッシュは、recovery-cache
設定属性を介して指定できます。指定のない場合は、Data Grid がローカルキャッシュを設定します。
リカバリーが有効になっているすべての Data Grid キャッシュ間で同じリカバリーキャッシュを共有することは可能です (必須ではありません)。デフォルトのリカバリーキャッシュが上書きされた場合、指定のリカバリーキャッシュは、キャッシュ自体が使用するものとは異なるトランザクションマネージャーを返す TransactionManagerLookup を使用する必要があります。
9.8.5. トランザクションマネージャーとの統合
これはトランザクションマネージャーに固有のものですが、通常トランザクションマネージャーは XAResource.recover()
を呼び出すために XAResource
実装への参照が必要になります。Data Grid XAResource
の以下の API への参照を取得するには、以下を行います。
XAResource xar = cache.getAdvancedCache().getXAResource();
トランザクションを実行するプロセスとは異なるプロセスで復元を実行することが一般的です。
9.8.6. 調整
トランザクションマネージャーは、システム管理者に未確定のトランザクションについて独自の方法で通知します。この段階では、システム管理者がトランザクションの XID(バイトアレイ) を把握していることを前提としています。
通常のリカバリーフローは以下のとおりです。
- ステップ 1: システム管理者は、JMX を介して Data Grid サーバーに接続し、未確定のトランザクションを一覧表示します。以下のイメージは、未確定のトランザクションを持つ Data Grid ノードに接続する JConsole を示しています。
図9.1 未確定のトランザクションの表示
未確定の各トランザクションのステータスが表示されます (この例では PREPARED です)。status フィールドに複数の要素が存在する可能性があります。たとえば、トランザクションが特定ノードでコミットされていても、それらのノードでコミットされない場合は PREPARED および COMMITTED です。
- ステップ 2: システム管理者は、トランザクションマネージャーから受け取った XID を数字で表した Data Grid 内部 ID に視覚的にマッピングします。XID(バイトアレイ) は、JMX ツール (JConsole など) に渡して Data Grid 側で再アセンブルされるため、このステップが必要です。
- ステップ 3: システム管理者は、内部 ID に基づいて、対応する jmx 操作を介してトランザクションのコミット/ロールバックを強制的に実行します。以下のイメージは、内部 ID に基づいてトランザクションのコミットを強制することで取得します。
図9.2 コミットの強制
上記のすべての JMX 操作は、トランザクションの発信場所に関係なく、任意のノードで実行できます。
9.8.6.1. XID に基づくコミット/ロールバックの強制
未確定のトランザクションのコミット/ロールバックの強制を行う XID ベースの JMX 操作も使用できます。これらのメソッドはトランザクションに関連する番号ではなく、XID を記述する byte[] アレイを受け取ります (前述のステップ 2 で説明)。これらは、たとえば、特定の未確定トランザクションの自動完了ジョブを設定する場合に役立ちます。このプロセスはトランザクションマネージャーのリカバリーにプラグインされ、トランザクションマネージャーの XID オブジェクトにアクセスできます。
第10章 Grid でのコードの実行
キャッシュの主な利点は、マシン全体でもキーで値を迅速に検索できることです。実際、この理由だけで、おそらく多くのユーザーが Data Grid を使用しています。ただし、Data Grid には、すぐには明らかにならない多くの利点があります。通常、Data Grid はマシンのクラスターで使用されるため、ユーザーのニーズのワークロードを実行するためにクラスター全体を利用するのに役立つ機能もあります。
このセクションでは、埋め込みキャッシュを使用したグリッドでのコードの実行についてのみ説明します。リモートキャッシュを使用している場合は、リモートグリッドでのコードの実行に関する詳細を確認する必要があります。
10.1. クラスターエグゼキューター
マシンのグループがあるため、それらすべてでコードを実行するためにそれらの結合された計算能力を活用することは理にかなっています。キャッシュマネージャーには、クラスター内で任意のコードを実行できる優れたユーティリティーが付属しています。この機能にはキャッシュを使用する必要はありません。この クラスターエグゼキューター は、EmbeddedCacheManager
で executor() を呼び出すことで取得できます。このエグゼキュータは、クラスター設定と非クラスター設定の両方で取得できます。
ClusterExecutor は、コードがキャッシュ内のデータに依存しないコードを実行するために特別に設計されており、代わりに、ユーザーがクラスター内でコードを簡単に実行できるようにする方法として使用されます。
このマネージャーは、Java 8 を使用して特別に構築されており、機能的な API を念頭に置いているため、すべてのメソッドは機能的なインターフェイスを引数として取ります。また、これらの引数は他のノードに送信されるため、シリアライズする必要があります。ラムダがすぐに Serializable になるような策を使用しています。つまり、引数に Serializable と実際の引数タイプ (つまり、Runnable または Function) の両方を実装させることです。JRE は、呼び出す方法を決定する際に最も具体的なクラスを選択するため、ラムダは常にシリアライズ可能です。また、Externalizer を使用してメッセージサイズをさらに減らすこともできます。
マネージャーはデフォルトで、指定されたコマンドを、送信元のノードを含むクラスター内のすべてのノードに送信します。セクションで説明されているように、filterTargets
メソッドを使用して、タスクが実行するノードを制御できます。
10.1.1. 実行ノードのフィルタリング
コマンドを実行するノードを制限できます。たとえば、同じラック内のマシンでのみ計算を実行したい場合があります。または、ローカルサイトで 1 回、別のサイトで操作を再実行することもできます。クラスターエグゼキューターは、同じマシン、ラック、またはサイトレベルのスコープで要求を送信するノードを制限できます。
SameRack.java
EmbeddedCacheManager manager = ...; manager.executor().filterTargets(ClusterExecutionPolicy.SAME_RACK).submit(...)
このトポロジーベースフィルタリングを使用するには、サーバーヒントを介してトポロジー対応のコンシステントハッシュを有効にする必要があります。
ノードの Address
に基づいて述部を使用してフィルタリングすることもできます。これは任意で、以前のコードスニペットでトポロジーベースのフィルタリングと組み合わせることもできます。
また、実行対象と見なすことができるノードを除外する Predicate
を使用して、任意の方法でターゲットノードを選択することもできます。これは同時に Topology フィルタリングと組み合わせて、クラスター内でコードを実行する場所をより詳細に制御できるようにすることもできます。
Predicate.java
EmbeddedCacheManager manager = ...; // Just filter manager.executor().filterTargets(a -> a.equals(..)).submit(...) // Filter only those in the desired topology manager.executor().filterTargets(ClusterExecutionPolicy.SAME_SITE, a -> a.equals(..)).submit(...)
10.1.2. Timeout
クラスターエグゼキューターを使用すると、呼び出しごとにタイムアウトを設定できます。デフォルトは、Transport Configuration で設定された分散同期のタイムアウトになります。このタイムアウトは、クラスター化されたキャッシュマネージャーとクラスター化されていないキャッシュマネージャーの両方で機能します。タイムアウトの期限が切れると、エグゼキューターがタスクを実行しているスレッドを中断する場合と中断しない場合があります。ただし、タイムアウトが発生すると、Consumer
または Future
は TimeoutException
を渡して完了します。この値は、timeout メソッドを使用し、希望の期間を指定して上書きできます。
10.1.3. 単一ノードの提出
クラスターエグゼキューターは、すべてのノードにコマンドを送信する代わりに、単一ノード送信モードで実行することもできます。代わりに、通常はコマンドを受信するノードの 1 つを選択し、1 つだけに送信します。それぞれの送信は、別のノードを使用してタスクが実行される可能性があります。これは、ClusterExecutor が実装する java.util.concurrent.Executor
として ClusterExecutor を使用するのが非常に便利です。
SingleNode.java
EmbeddedCacheManager manager = ...; manager.executor().singleNodeSubmission().submit(...)
10.1.3.1. Failover
シングルノード送信で実行する場合は、コマンドを再試行することにより、特定のコマンドの処理中に例外が発生した場合にクラスターエグゼキューターが処理できるようにすることが望ましい場合があります。これが発生すると、クラスターエグゼキューターは単一のノードを再度選択し、任意のフェイルオーバー試行までコマンドを再実行します。選択したノードは、トポロジーまたは述部のチェックをパスするノードである可能性があることに注意してください。フェイルオーバーは、上書きされた singleNodeSubmission メソッドを呼び出すことで有効になります。指定されたコマンドは、コマンドが例外なく完了するか、送信の合計量が指定されたフェイルオーバーカウントと等しくなるまで、単一のノードに再送信されます。
10.1.4. 例: PI アプローチ
この例は、ClusterExecutor を使用して PI の値を見積もる方法を示しています。
Pi 近似は、クラスターエグゼキューターを介した並列分散実行から大きな利点を得ることができます。正方形の面積は Sa = 4r2 であり、円の面積は Ca=pi*r2 であることを思い出してください。2 つ目の式からの r2 を置き換えると、pi = 4 * Ca/S になります。ここで、正方形に非常に多くのダーツを射ることができると仮定して、射ったダーツの総数に対して円の中に入ったダーツの割合を取ると、Ca/Sa の値が近似します。pi = 4 * Ca/Sa であるため、pi の近似値を簡単に導き出すことができます。ダーツを多く撃つほど、より良い近似が得られます。以下の例では、10 億本のダーツを撃ちますが、それらを連続して撃つのではなく、Data Grid クラスター全体でダーツ射撃の作業を並列化します。これは 1 のクラスターで正常に機能しますが、遅くなることに注意してください。
public class PiAppx { public static void main (String [] arg){ EmbeddedCacheManager cacheManager = .. boolean isCluster = .. int numPoints = 1_000_000_000; int numServers = isCluster ? cacheManager.getMembers().size() : 1; int numberPerWorker = numPoints / numServers; ClusterExecutor clusterExecutor = cacheManager.executor(); long start = System.currentTimeMillis(); // We receive results concurrently - need to handle that AtomicLong countCircle = new AtomicLong(); CompletableFuture<Void> fut = clusterExecutor.submitConsumer(m -> { int insideCircleCount = 0; for (int i = 0; i < numberPerWorker; i++) { double x = Math.random(); double y = Math.random(); if (insideCircle(x, y)) insideCircleCount++; } return insideCircleCount; }, (address, count, throwable) -> { if (throwable != null) { throwable.printStackTrace(); System.out.println("Address: " + address + " encountered an error: " + throwable); } else { countCircle.getAndAdd(count); } }); fut.whenComplete((v, t) -> { // This is invoked after all nodes have responded with a value or exception if (t != null) { t.printStackTrace(); System.out.println("Exception encountered while waiting:" + t); } else { double appxPi = 4.0 * countCircle.get() / numPoints; System.out.println("Distributed PI appx is " + appxPi + " using " + numServers + " node(s), completed in " + (System.currentTimeMillis() - start) + " ms"); } }); // May have to sleep here to keep alive if no user threads left } private static boolean insideCircle(double x, double y) { return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2)) <= Math.pow(0.5, 2); } }
第11章 ストリーム
結果を生成するために、キャッシュ内のサブセットまたはすべてのデータを処理したい場合があります。これにより、マップの削減が可能になります。Data Grid を使用すると、ユーザーは非常によく似た操作を実行できますが、標準の JRE API を使用して実行できます。Java 8 では、ユーザーがデータに対して処理を細かく反復するのではなく、コレクションで機能スタイルの操作を可能にする ストリーム の概念が導入されました。ストリーム操作は、MapReduce と似た方法で実装できます。MapReduce と同様、キャッシュ全体で処理を実行できますが、非常に大きなデータセットになりますが、効率的な方法になります。
ストリームは、クラスタートポロジーの変更に自動的に調整されるため、キャッシュに存在するデータを扱う場合に推奨される方法です。
また、エントリーの反復方法を制御できるため、クラスター全体ですべての操作を同時に実行する場合は、分散されたキャッシュで操作をより効率的に実行できます。
ストリームは、stream メソッドまたは parallelStream メソッドを呼び出して、Cache から返される entrySet、keySet、または values コレクションから取得されます。
11.1. 一般的なストリーム操作
本セクションでは、使用している基礎となるキャッシュの種類に関係なく、さまざまなオプションを説明します。
11.2. キーのフィルタリング
特定のキーのサブセットでのみ動作するようにストリームをフィルターできます。これは、CacheStream
で filterKeys メソッドを呼び出して実行できます。これは常に述部 フィルター で使用する必要があります。述部がすべてのキーを保持する場合はより高速になります。
AdvancedCache
インターフェイスに精通している場合には、この keyFilter よりも getAll を使用する理由が示唆される可能性があります。エントリーをそのまま必要とし、それらすべてをローカルノードのメモリーに必要とする場合、getAll を使用することにはいくつかの小さな利点 (ほとんどの場合ペイロードが小さい) があります。ただし、これらの要素で処理を行う必要がある場合は、分散並列処理とスレッド並列処理の両方を無料で取得できるため、ストリームをお勧めします。
11.3. セグメントベースのフィルタリング
これは高度な機能で、Data Grid セグメントおよびハッシュ技術の深い知識でのみ使用する必要があります。これらのセグメントベースのフィルタリングは、データを個別の呼び出しに分割する必要がある場合に便利です。これは、Apache Spark などの他のツールと統合する際に便利です。
このオプションは、レプリケートされたキャッシュと分散されたキャッシュでのみサポートされます。これにより、ユーザーは KeyPartitioner によって決定されるタイミングでデータのサブセットで操作できます。このセグメントは、CacheStream
で filterKeySegments メソッドを呼び出してフィルタリングできます。これは、キーフィルターの後に、中間操作が実行される前に適用されます。
11.4. ローカル/無効化
ローカルキャッシュまたは無効化キャッシュで使用されるストリームは、通常のコレクションでストリームを使用する場合とまったく同じように使用できます。Data Grid は、必要に応じてすべての変換をバックグラウンドで処理し、より興味深いすべてのオプション (つまり storeAsBinary およびキャッシュローダー) で機能します。ストリーム操作が実行されるノードにローカルデータのみが使用されます。たとえば、無効化はローカルエントリーのみを使用します。
11.5. 例
以下のコードはキャッシュを取得し、値に "JBoss" の文字列が含まれるすべてのキャッシュエントリーを持つマップを返します。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("JBoss")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
11.6. 配布/複製/散在
これは、ストリームがストライドになるところです。ストリーム操作が実行されると、関連データを持つ各ノードにさまざまな中間操作と端末操作が送信されます。これにより、データを所有するノードで中間値を処理し、最終結果を元のノードにのみ送信し、パフォーマンスが向上します。
11.6.1. 再ハッシュ対応
内部的にはデータがセグメント化され、各ノードはプライマリー所有者として所有するデータでのみ操作を実行します。これにより、セグメントが各ノードで等量のデータを提供するのに十分な粒度であると仮定して、データを均等に処理できます。
分散キャッシュを使用する場合には、新規ノードが加わったり、残ったりすると、データをノード間で再シャッフルすることができます。分散ストリームはこのデータの再シャッフルを自動的に処理するため、ノードがクラスターを離れたり、クラスターに参加したりするときの監視について心配する必要はありません。シャッフルされたエントリーは 2 回処理される可能性があり、重複処理の量を制限するために、キーレベルまたはセグメントレベル (端末操作に応じて) で処理されたエントリーを追跡します。
ストリームで再ハッシュ認識を無効にすることは可能ですが、推奨されません。これは、再ハッシュが発生したときに、リクエストがデータのサブセットの確認を処理できる場合に限り考慮する必要があります。これは、CacheStream.disableRehashAware() を呼び出すことで実行できます。再ハッシュが発生しない場合、ほとんどの操作のパフォーマンスの向上は、完全に無視できます。唯一の例外は、処理されたキーを追跡する必要がないため、使用するメモリーが少ない iterator と forEach です。
自分が何をしているかを本当に理解していない限り、再ハッシュ認識を無効にすることを再考してください。
11.6.2. シリアル化
操作は他のノード全体に送信されるため、Data Grid マーシャリングでシリアライズできる必要があります。これにより、他のノードに操作を送信できます。
最も簡単な方法は、CacheStream インスタンスを使用し、通常どおりラムダを使用することです。Data Grid は、さまざまな Stream 中間メソッドおよび端末メソッドをすべて上書きして、引数の Serializable バージョン (つまり SerializableFunction、SerializablePredicate など) を取ります。これらのメソッドは CacheStream にあります。これは、ここ で定義されている最も具体的な方法を選択するための仕様に依存しています。
上記の例では、Collector
を使用してすべての結果を Map
に収集しました。ただし、Collector クラスは Serializable インスタンスを生成しません。そのため、これらを使用する必要がある場合は、2 つの方法があります。
1 つのオプションとして、Supplier<Collector>
の指定を可能にする CacheCollectors クラスを使用します。このインスタンスは、シリアライズされていない Collector
を提供するために、Collectors を使用することができます。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
または、CacheCollectors の使用を回避し、代わりに Supplier<Collector>
を取得するオーバーロードされた collect
メソッドを使用できます。オーバーロードされた collect
メソッドは CacheStream
インターフェイスでしか利用できません。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
ただし、Cache
および CacheStream
インターフェイスを使用できない場合は、Serializable
引数を使用できないため、ラムダを複数インターフェイスをキャストすることで、ラムダを Serializable
に手動でキャストする必要があります。優れた方法ではありませんが、設定することは可能です。
Map<Object, String> jbossValues = map.entrySet().stream() .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
推奨され最も高性能な方法は、最小限のペイロードを提供するために、AdvancedExternalizer
を使用することです。残念ながら、これは、高度なエクスターナライザーが事前にクラスを定義する必要があるため、ラムダを使用できないことを意味します。
以下に示すように、高度なエクスターナライザーを使用できます。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ContainsFilter implements Predicate<Map.Entry<Object, String>> { private final String target; ContainsFilter(String target) { this.target = target; } @Override public boolean test(Map.Entry<Object, String> e) { return e.getValue().contains(target); } } class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> { @Override public Set<Class<? extends ContainsFilter>> getTypeClasses() { return Util.asSet(ContainsFilter.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException { output.writeUTF(object.target); } @Override public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException { return new ContainsFilter(input.readUTF()); } }
コレクターサプライヤーに高度なエクスターナライザーを使用して、ペイロードサイズをさらに減らすこともできます。
Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> { static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier(); private ToMapCollectorSupplier() { } @Override public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() { return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue); } } class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> { @Override public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() { return Util.asSet(ToMapCollectorSupplier.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException { } @Override public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException { return ToMapCollectorSupplier.INSTANCE; } }
11.7. 並列計算
分散ストリームは、デフォルトではできるだけ並列処理を試みます。エンドユーザーはこれを制御でき、実際にはオプションのいずれかを制御する必要があります。これらのストリームを並列化する方法は 2 つあります。
各ノードにローカル キャッシュコレクションからストリームを作成している場合、エンドユーザーは stream または parallelStream メソッドの呼び出しのいずれかを選択できます。並列ストリームが選択されたかどうかに応じて、各ノードに対してローカルで複数のスレッドが有効になります。再ハッシュ対応の iterator や forEach オペレーションなどの一部のオペレーションは、常にローカルで順次ストリームを使用することに注意してください。これは、並行ストリームをローカルに許可するように、ある時点で強化できます。
ローカルの並列処理を使用する場合は、計算が高速にかかる多数のエントリーや操作が必要になるため注意が必要です。また、ユーザーが forEach
で並列ストリームを使用する場合、これは通常は計算オペレーションに予約されている共有プールで実行されるため、アクションをブロックしないようにする必要があることに注意してください。
リモートリクエスト 複数のノードがある場合に、リモート要求をすべて同時に処理するか、一度に 1 つずつ処理するかを制御することが望ましい場合があります。デフォルトでは、iterator 以外のすべての端末オペレーションは同時リクエストを実行します。iterator は、ローカルノードでのメモリー使用量全体を減らす方法であり、実際に実行する連続要求のみを実行します。
ユーザーがこのデフォルトを変更したい場合は、CacheStream
で sequentialDistribution または parallelDistribution メソッドを呼び出して実行できます。
11.8. タスクのタイムアウト
操作リクエストのタイムアウト値を設定できます。このタイムアウトはリモートリクエストのタイムアウトにのみ使用され、リクエストごとに使用されます。前者はローカル実行がタイムアウトしないことを意味し、後者は上記のようなフェイルオーバーシナリオがある場合、後続のリクエストにはそれぞれ新しいタイムアウトがあることを意味します。タイムアウトを指定しないと、レプリケーションのタイムアウトをデフォルトのタイムアウトとして使用します。以下を実行することで、タスクでタイムアウトを設定できます。
CacheStream<Map.Entry<Object, String>> stream = cache.entrySet().stream(); stream.timeout(1, TimeUnit.MINUTES);
詳細は、java ドキュメントの timeout を確認してください。
11.9. 注入
Stream には、forEach と呼ばれる端末オペレーションがあり、データに副次的な影響を与える操作を実行できます。この場合、このストリームをサポートする Cache
への参照を取得することが推奨されます。Consumer
が CacheAware インターフェイスを実装する場合は、Consumer
インターフェイスからの accept メソッドの前に injectCache
メソッドが呼び出されます。
11.10. 分散ストリームの実行
分散ストリームの実行は、マップの削減に非常に似ています。ここでは、ゼロを多数の中間操作 (マップ、フィルターなど) に送信し、1 つの端末オペレーションが各種ノードに送信します。オペレーションは、基本的に次のようになります。
- 必要なセグメントは、どのノードが指定のセグメントのプライマリー所有者であるかによってグループ化されます。
リクエストが生成され、処理すべきセグメントを含む中間および端末オペレーションが含まれる各リモートノードに送信されます。
- 端末オペレーションは、必要に応じてローカルで実行されます。
- 各リモートノードはこの要求を受け取り、オペレーションを実行し、その後に応答を戻します。
- その後、ローカルノードが、ローカル応答とリモート応答を収集し、オペレーション自体に必要な削減を実行します。
- その後、最終的な縮小応答がユーザーに返されます
ほとんどの場合、オペレーションはすべて各リモートノードに完全に適用されるため、すべてのオペレーションは完全に分散されます。通常、複数のノードからの結果を減らすために、最後のオペレーションまたは関連するものだけが再適用される場合があります。重要な点の 1 つは、実際にはシリアライズする必要がないことに注意してください。これは、希望の部分であるものが最後に送信された最後の値になります (さまざまなオペレーションの例外は以下に強調表示されます)。
端末オペレーターの分散結果の縮小 以下の段落では各種の端末オペレーターの分散処理方法を説明します。これらのいくつかは、最終結果の代わりに中間値をシリアル化可能にする必要があるという点で特別です。
- allMatch noneMatch anyMatch
- allMatch オペレーションは各ノードで実行され、すべての結果が論理的に結合されて適切な値を取得します。noneMatch オペレーションおよび anyMatch オペレーションは、論理的または代わりに使用します。これらのメソッドは早期終了もサポートしており、最終結果が判明するとリモート操作とローカル操作を停止します。
- collect
- collect メソッドは、いくつかの追加手順を実行できるという点で興味深いものです。リモートノードは、結果に対して最終 finisher を実行せず、代わりに完全に結合された結果を送り返すことを除いて、すべてを通常どおり実行します。次に、ローカルスレッドは、リモートとローカルの結果を値に 結合 し、最終的に終了します。ここで覚えておくべき重要な点は、最終的な値はシリアル化可能である必要はなく、supplier メソッドおよび combiner メソッドから生成された値である必要があるということです。
- count
- count メソッドは、各ノードから番号を一緒に追加します。
- findAny findFirst
- findAny オペレーションは、最初に見つかった値 (リモートノードからのものかローカル) を返します。これは、値が見つかると他の値を処理しないという点で、早期終了をサポートすることに注意してください。findFirst メソッドは、ソートされた中間オペレーションが必要になるため特別なものです。これは、例外 セクションで説明されています。
- max min
- max メソッドおよび min メソッドは、各ノードの各最小値または最大値を見つけ、最終的にノード間の最小値または最大値のみが返されるようにローカルで実行されます。
- reduce
- さまざまな reduce メソッド 1、2、3 は、アキュムレーターが実行可能な量の結果のシリアライズを最終的に行います。次に、ローカルとリモートの結果をローカルでまとめて累積してから、指定した場合は組み合わせます。これは、組み合わせた値がシリアライズ可能である必要がないことを意味する点に注意してください。
11.11. キーベースの再ハッシュ対応 Operator
iterator、spliterator、および forEach は、リハッシュ認識が、セグメントだけでなくセグメントごとに処理されたキーを追跡する必要がある点で、他のターミナル operator とは異なります。これは、クラスターメンバーシップが変更された場合でも、1 回だけ (iterator と spliterator) または 1 回以上の (forEach) の動作を保証するためです。
リモートノードで呼び出されると iterator
および spliterator
オペレーターは、エントリーの再バッチを返します。この場合、次のバッチは最後に使用された後にのみ送信されます。このバッチ処理は、ある時点のメモリー内のエントリー数を制限するために行われます。ユーザーノードは、処理したキーを保持し、特定のセグメントが完了すると、それらのキーをメモリーから解放します。そのため、iterator メソッドには順次処理が優先されることがあるため、すべてのノードからではなく、セグメントキーのサブセットのみがメモリーに保持されます。
forEach()
メソッドはバッチを返しますが、キーの処理が少なくともバッチ処理された後に、キーのバッチを返します。これにより、送信元ノードはどの鍵がすでに処理されているかを把握して、同じエントリーを再処理する可能性を減らすことができます。ただし、これはノードが予期せずダウンした場合に、少なくとも 1 回の動作を要する可能性があることを意味します。この場合、そのノードはバッチを処理していてまだ完了していない可能性があり、処理されたが完了したバッチに含まれていないエントリーは、再ハッシュ失敗オペレーションが発生したときに再度実行されます。ノードを追加しても、すべての応答を受け取るまで、再ハッシュフェイルオーバーが発生しないため、この問題は発生しません。
これらのオペレーションのバッチサイズは両方とも、CacheStream
で distributedBatchSize メソッドを呼び出して設定できる値と同じ値で制御されます。この値はデフォルトで、状態遷移で設定された chunkSize
に設定されます。残念ながら、この値は、メモリー使用量とパフォーマンスと少なくとも 1 回のトレードオフであり、マイレージは異なる場合があります。
レプリケートされた分散キャッシュでの iterator
の使用
ノードが分散ストリームに要求されたすべてのセグメントのプライマリーまたはバックアップ所有者である場合、Data Grid は iterator
または spliterator
の端末操作をローカルで実行します。これにより、リモートの反復がリソース集約型であるためにパフォーマンスが最適化されます。
この最適化は、レプリケートされたキャッシュと分散キャッシュの両方に適用されます。ただし、Data Grid は、shared
および write-behind
の両方が有効なキャッシュストアを使用する場合にリモートで反復を実行します。この場合は、リモートで反復を行うことで一貫性が確保されます。
11.12. 中間オペレーションの例外
特別な例外を持つ中間オペレーションがあります。これらは、skip、peek、ソートされた 12 および distinct です。これらの方法はすべて、正確さを保証するためにストリーム処理に埋め込まれたある種の人為的な iterator を備えています。これらは以下のように文書化されています。このオペレーションにより、パフォーマンスが低下する可能性があります。
- スキップ
- 中間スキップオペレーションまで人為的な iterator が埋め込まれています。結果はローカルに格納され、適切な要素量をスキップできます。
- ソート済み
- 警告: この操作には、ローカルノード上のメモリーのすべてのエントリーが必要です。人為的な iterator は、中間のソートされたオペレーションまで埋め込まれます。すべての結果がローカルでソートされます。要素のバッチを返す分散ソートを計画することは可能ですが、これはまだ実装されていません。
- 一意
- 警告: この操作には、ローカルノード上のメモリーのすべて、またはほぼすべてのエントリーが必要です。各リモートノードで distinct が実行され、人為的な iterator がそれらの distinct 値を返します。そして最後に、これらの結果はすべて、個別のオペレーションが実行されます。
残りの中間オペレーションは、期待通りに完全に配布されます。
11.13. 例
単語数
単語数は使いすぎると、map/reduc パラダイムの典型的な例になります。Data Grid ノードに key → sentence が保存されていると仮定します。キーは文字列であり、各文も文字列であり、使用可能なすべての文のすべての単語の出現をカウントする必要があります。このような分散タスクの実装は、以下のように定義できます。
public class WordCountExample { /** * In this example replace c1 and c2 with * real Cache references * * @param args */ public static void main(String[] args) { Cache<String, String> c1 = ...; Cache<String, String> c2 = ...; c1.put("1", "Hello world here I am"); c2.put("2", "Infinispan rules the world"); c1.put("3", "JUDCon is in Boston"); c2.put("4", "JBoss World is in Boston as well"); c1.put("12","JBoss Application Server"); c2.put("15", "Hello world"); c1.put("14", "Infinispan community"); c2.put("15", "Hello world"); c1.put("111", "Infinispan open source"); c2.put("112", "Boston is close to Toronto"); c1.put("113", "Toronto is a capital of Ontario"); c2.put("114", "JUDCon is cool"); c1.put("211", "JBoss World is awesome"); c2.put("212", "JBoss rules"); c1.put("213", "JBoss division of RedHat "); c2.put("214", "RedHat community"); Map<String, Long> wordCountMap = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())); } }
この場合、前述の例から単語数を簡単に実行できます。
ただし、例で最も頻繁に使用される単語を見つけたい場合はどうすればよいでしょうか。このケースについて少し考えてみると、最初にすべての単語をカウントしてローカルで利用できるようにする必要があることに気付くでしょう。そのため、実際にはいくつかのオプションがあります。
コレクターでフィニッシャーを使用できます。これは、すべての結果が収集された後にユーザースレッドで呼び出されます。前の例からいくつかの冗長な行が削除されました。
public class WordCountExample { public static void main(String[] args) { // Lines removed String mostFrequentWord = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.collectingAndThen( Collectors.groupingBy(Function.identity(), Collectors.counting()), wordCountMap -> { String mostFrequent = null; long maxCount = 0; for (Map.Entry<String, Long> e : wordCountMap.entrySet()) { int count = e.getValue().intValue(); if (count > maxCount) { maxCount = count; mostFrequent = e.getKey(); } } return mostFrequent; })); }
残念ながら、最後のステップは単一のスレッドでのみ実行されるため、単語が多い場合は非常に遅くなる可能性があります。これを Streams で並列化するもう 1 つの方法があります。
前述したように、処理後にローカルノードに含まれるため、実際にはマップ結果でストリームを使用することができました。そのため、結果に並列ストリームを使用できます。
public class WordFrequencyExample { public static void main(String[] args) { // Lines removed Map<String, Long> wordCount = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())); Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream().reduce( (e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);
これにより、最も頻繁に発生する要素を計算する際に、すべてのコアをローカルで利用できるようになります。
特定のエントリーの削除
分散ストリームは、ライブ先のデータを変更する方法として使用することもできます。たとえば、特定の単語が含まれるキャッシュのエントリーをすべて削除します。
public class RemoveBadWords { public static void main(String[] args) { // Lines removed String word = .. c1.entrySet().parallelStream() .filter(e -> e.getValue().contains(word)) .forEach((c, e) -> c.remove(e.getKey()));
シリアル化されているものとそうでないものを注意深く記録すると、ラムダによって取得されるときに、オペレーションとともに単語のみが他のノードにシリアル化されることがわかります。ただし、実際に節約できるのは、キャッシュ操作がプライマリー所有者に対して実行されるため、これらの値をキャッシュから削除するために必要なネットワークトラフィックの量が削減されることです。各ノードで呼び出されたときにキャッシュを BiConsumer に渡す特別な BiConsumer メソッドのオーバーライドを提供するため、キャッシュはラムダによって取得されません。
この方法で forEach
コマンドを使用する際に留意すべきことの 1 つは、基になるストリームがロックを取得しないことです。キャッシュの削除操作は自然にロックを取得しますが、値はストリームが見たものから変更されている可能性があります。つまり、ストリームがエントリーを読み取った後にエントリーが変更された可能性がありますが、削除によって実際に削除されました。
LockedStream
と呼ばれる新しいバリアントを具体的に追加しました。
他の多くの例
Streams
API は JRE ツールであり、それを使用するための例がたくさんあります。操作は何らかの方法でシリアル化可能である必要があることを覚えておいてください。
第12章 JCache (JSR-107) API
Data Grid は JCache 1.0 API (JSR-107) の実装を提供します。JCache は、一時 Java オブジェクトをメモリーにキャッシュするための標準 Java API を指定します。Java オブジェクトをキャッシュすると、取得にコストがかかるデータ (DB や Web サービスなど) や計算が難しいデータを使用することで発生するボトルネックを回避するのに役立ちます。これらのタイプのオブジェクトをメモリーにキャッシュすると、コストのかかるラウンドトリップや再計算を行う代わりに、メモリーから直接データを取得することで、アプリケーションのパフォーマンスを高速化できます。本書では、仕様の Data Grid 実装で JCache を使用する方法と、API の主要な側面が説明されています。
12.1. 組み込みキャッシュの作成
前提条件
-
cache-api
がクラスパスにあることを確認します。 以下の依存関係を
pom.xml
に追加します。<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-jcache</artifactId> </dependency>
手順
- 以下のように、デフォルトの JCache API 設定を使用する組み込みキャッシュを作成します。
import javax.cache.*; import javax.cache.configuration.*; // Retrieve the system wide cache manager CacheManager cacheManager = Caching.getCachingProvider().getCacheManager(); // Define a named cache with default JCache configuration Cache<String, String> cache = cacheManager.createCache("namedCache", new MutableConfiguration<String, String>());
12.1.1. 組み込みキャッシュの設定
-
以下のように、カスタム Data Grid 設定の URI を
CachingProvider.getCacheManager(URI)
呼び出しに渡します。
import java.net.URI; import javax.cache.*; import javax.cache.configuration.*; // Load configuration from an absolute filesystem path URI uri = URI.create("file:///path/to/infinispan.xml"); // Load configuration from a classpath resource // URI uri = this.getClass().getClassLoader().getResource("infinispan.xml").toURI(); // Create a cache manager using the above configuration CacheManager cacheManager = Caching.getCachingProvider().getCacheManager(uri, this.getClass().getClassLoader(), null);
デフォルトでは、JCache API はデータを storeByValue
として保存するように指定しているため、キャッシュへの操作以外のオブジェクト状態の変更は、キャッシュに保存されているオブジェクトに影響を与えません。Data Grid はこれまで、シリアル化/マーシャリングを使用してこれを実装し、コピーを作成してキャッシュに保存しており、その方法は仕様に準拠しています。したがって、Data Grid でデフォルトの JCache 設定を使用する場合、保存されるデータはマーシャリング可能である必要があります。
または、(Data Grid または JDK Collections が機能するのと同じように) 参照によってデータを格納するように JCache を設定することもできます。これを行うには、次のコマンドを実行します。
Cache<String, String> cache = cacheManager.createCache("namedCache", new MutableConfiguration<String, String>().setStoreByValue(false));
12.2. リモートキャッシュの作成
前提条件
-
cache-api
がクラスパスにあることを確認します。 以下の依存関係を
pom.xml
に追加します。<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-jcache-remote</artifactId> </dependency>
手順
- リモート Data Grid サーバーでキャッシュを作成し、以下のようにデフォルトの JCache API 設定を使用します。
import javax.cache.*; import javax.cache.configuration.*; // Retrieve the system wide cache manager via org.infinispan.jcache.remote.JCachingProvider CacheManager cacheManager = Caching.getCachingProvider("org.infinispan.jcache.remote.JCachingProvider").getCacheManager(); // Define a named cache with default JCache configuration Cache<String, String> cache = cacheManager.createCache("remoteNamedCache", new MutableConfiguration<String, String>());
12.2.1. リモートキャッシュの設定
Hot Rod 設定ファイルには、リモートキャッシュのカスタマイズに使用できる infinispan.client.hotrod.cache.*
プロパティーが含まれます。
-
以下のように、
hotrod-client.properties
ファイルの URI をCachingProvider.getCacheManager(URI)
呼び出しに渡します。
import javax.cache.*; import javax.cache.configuration.*; // Load configuration from an absolute filesystem path URI uri = URI.create("file:///path/to/hotrod-client.properties"); // Load configuration from a classpath resource // URI uri = this.getClass().getClassLoader().getResource("hotrod-client.properties").toURI(); // Retrieve the system wide cache manager via org.infinispan.jcache.remote.JCachingProvider CacheManager cacheManager = Caching.getCachingProvider("org.infinispan.jcache.remote.JCachingProvider") .getCacheManager(uri, this.getClass().getClassLoader(), null);
12.3. データの保管および取得
JCache の API が java.util.Map または java.util.concurrent.ConcurrentMap のいずれも拡張していないにもかかわらず、キー/値の API を提供してデータを格納および取得します。
import javax.cache.*; import javax.cache.configuration.*; CacheManager cacheManager = Caching.getCachingProvider().getCacheManager(); Cache<String, String> cache = cacheManager.createCache("namedCache", new MutableConfiguration<String, String>()); cache.put("hello", "world"); // Notice that javax.cache.Cache.put(K) returns void! String value = cache.get("hello"); // Returns "world"
標準の java.util.Map とは異なり、javax.cache.Cache には put と getAndPut と呼ばれる 2 つの基本的な put メソッドが含まれています。前者は void
を返しますが、後者はキーに関連付けられた以前の値を返します。そのため、JCache の java.util.Map.put(K) に相当するものは javax.cache.Cache.getAndPut(K) になります。
JCache API はスタンドアロンキャッシングのみを対象としていますが、永続ストアにプラグインすることができ、クラスターリングまたは分散を念頭に置いて設計されています。javax.cache.Cache が 2 つの put メソッドを提供する理由は、標準の java.util.Map put 呼び出しにより以前の値を計算するためです。永続ストアが使用されている場合、またはキャッシュが分散されている場合、前の値を返すことはコストのかかる操作になる可能性があり、多くの場合、ユーザーは戻り値を使用せずに標準の java.util.Map.put(K) を呼び出します。そのため、JCache ユーザーは戻り値が関連するかどうかについて考慮する必要があります。この場合、javax.cache.Cache.getAndPut(K) を呼び出す必要があります。それ以外の場合は、java.util.Map.put(K, V) を呼び出すことができ、以前の値を返すようなコストのかかる操作が返されなくなります。
12.4. java.util.concurrent.ConcurrentMap と javax.cache.Cache APIs の比較
ここでは、java.util.concurrent.ConcurrentMap および javax.cache.Cache API によって提供されるデータ操作 API を簡単に比較します。
操作 | java.util.concurrent.ConcurrentMap<K, V> | javax.cache.Cache<K, V> |
---|---|---|
保存して返さない | 該当なし |
|
保存して以前の値を返す |
|
|
存在しない場合は保存する |
|
|
取得 |
|
|
存在する場合は削除 |
|
|
以前の値を削除して返す |
|
|
条件の削除 |
|
|
存在する場合は置き換え |
|
|
以前の値を置き換えて返す |
|
|
条件の置き換え |
|
|
2 つの API を比較すると、可能であれば、JCache が以前の値を返さないようにして、コストのかかるネットワークまたは IO 操作を実行するオペレーションを回避していることがわかります。これは、JCache API の設計における最も重要な原則です。実際、java.util.concurrent.ConcurrentMap には存在するが、分散キャッシュでの計算にコストがかかる可能性があるため、javax.cache.Cache には存在しない一連のオペレーションがあります。唯一の例外は、キャッシュの内容を反復処理することです。
操作 | java.util.concurrent.ConcurrentMap<K, V> | javax.cache.Cache<K, V> |
---|---|---|
キャッシュのサイズを計算する |
| 該当なし |
キャッシュのすべてのキーを返す |
| 該当なし |
キャッシュのすべての値を返す |
| 該当なし |
キャッシュ内のすべてのエントリーを返す |
| 該当なし |
キャッシュを繰り返し処理する |
keySet、value、または entrySet で |
|
12.5. JCache インスタンスのクラスターリング
Data Grid JCache 実装は仕様を越え、標準 API を使用してクラスターキャッシュを使用できるようになります。次のようにキャッシュを複製するように設定された Data Grid 設定ファイルがあるとします。
infinispan.xml
<infinispan> <cache-container default-cache="namedCache"> <transport cluster="jcache-cluster" /> <replicated-cache name="namedCache" /> </cache-container> </infinispan>
このコードを使用して、キャッシュのクラスターを作成できます。
import javax.cache.*; import java.net.URI; // For multiple cache managers to be constructed with the standard JCache API // and live in the same JVM, either their names, or their classloaders, must // be different. // This example shows how to force their classloaders to be different. // An alternative method would have been to duplicate the XML file and give // it a different name, but this results in unnecessary file duplication. ClassLoader tccl = Thread.currentThread().getContextClassLoader(); CacheManager cacheManager1 = Caching.getCachingProvider().getCacheManager( URI.create("infinispan-jcache-cluster.xml"), new TestClassLoader(tccl)); CacheManager cacheManager2 = Caching.getCachingProvider().getCacheManager( URI.create("infinispan-jcache-cluster.xml"), new TestClassLoader(tccl)); Cache<String, String> cache1 = cacheManager1.getCache("namedCache"); Cache<String, String> cache2 = cacheManager2.getCache("namedCache"); cache1.put("hello", "world"); String value = cache2.get("hello"); // Returns "world" if clustering is working // -- public static class TestClassLoader extends ClassLoader { public TestClassLoader(ClassLoader parent) { super(parent); } }
第13章 マルチマップキャッシュ
MutimapCache は、各キーに複数の値を含めることができる値にキーをマップする Data Grid キャッシュの一種です。
13.1. インストールと設定
pom.xml
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-multimap</artifactId> </dependency>
13.2. MultimapCache API
MultimapCache API は、Multimap キャッシュと対話する複数のメソッドを公開します。これらのメソッドは、ほとんどの場合、ノンブロッキングです。詳細については、制限 を参照してください。
public interface MultimapCache<K, V> { CompletableFuture<Optional<CacheEntry<K, Collection<V>>>> getEntry(K key); CompletableFuture<Void> remove(SerializablePredicate<? super V> p); CompletableFuture<Void> put(K key, V value); CompletableFuture<Collection<V>> get(K key); CompletableFuture<Boolean> remove(K key); CompletableFuture<Boolean> remove(K key, V value); CompletableFuture<Void> remove(Predicate<? super V> p); CompletableFuture<Boolean> containsKey(K key); CompletableFuture<Boolean> containsValue(V value); CompletableFuture<Boolean> containsEntry(K key, V value); CompletableFuture<Long> size(); boolean supportsDuplicates(); }
CompletableFuture<Void> put(K key, V value)
キーと値のペアをマルチマップキャッシュに配置します。
MultimapCache<String, String> multimapCache = ...; multimapCache.put("girlNames", "marie") .thenCompose(r1 -> multimapCache.put("girlNames", "oihana")) .thenCompose(r3 -> multimapCache.get("girlNames")) .thenAccept(names -> { if(names.contains("marie")) System.out.println("Marie is a girl name"); if(names.contains("oihana")) System.out.println("Oihana is a girl name"); });
このコードの出力は以下のようになります。
Marie is a girl name Oihana is a girl name
CompletableFuture<Collection<V>> get(K key)
存在する場合、このマルチマップキャッシュ内のキーに関連付けられた値のビューコレクションを返す非同期。取得したコレクションへの変更は、このマルチマップキャッシュの値を変更しません。このメソッドは空のコレクションを返すと、キーが見つからないことを意味します。
CompletableFuture<Boolean> remove(K key)
キーに関連付けられたエントリーがマルチマップキャッシュに存在する場合は、それを削除する非同期。
CompletableFuture<Boolean> remove(K key, V value)
キーと値のペアが存在する場合は、マルチマップキャッシュから削除する非同期。
CompletableFuture<Void> remove(Predicate<? super V> p)
非同期メソッド。指定の述語に一致するすべての値を削除します。
CompletableFuture<Boolean> containsKey(K key)
このマルチマップにキーが含まれる場合に true を返す非同期。
CompletableFuture<Boolean> containsValue(V value)
このマルチマップに少なくとも 1 つのキーの値が含まれている場合に true を返す非同期。
CompletableFuture<Boolean> containsEntry(K key, V value)
このマルチマップに値を持つキーと値のペアが 1 つ以上含まれている場合、true を返す非同期。
CompletableFuture<Long> size()
マルチマップキャッシュ内のキーと値のペアの数を返す非同期。明確な数のキーは返されません。
boolean supportsDuplicates()
マルチマップキャッシュが重複をサポートする場合は true を返す非同期。これは、マルチマップのコンテンツが'a' → ['1', '1', '2']になる可能性があることを意味します。重複はまだサポートされていないため、今のところ、このメソッドは常に false を返します。指定された値の存在は、'equals' および`hashcode' method'のコントラクトによって決定されます。
13.3. マルチマップキャッシュの作成
現在、MultimapCache は通常のキャッシュとして設定されます。これは、コードまたは XML 設定のいずれかで実行できます。[configure a cache]へのセクションリンクで、通常のキャッシュを設定する方法を参照してください。
13.3.1. 組み込みモード
// create or obtain your EmbeddedCacheManager EmbeddedCacheManager cm = ... ; // create or obtain a MultimapCacheManager passing the EmbeddedCacheManager MultimapCacheManager multimapCacheManager = EmbeddedMultimapCacheManagerFactory.from(cm); // define the configuration for the multimap cache multimapCacheManager.defineConfiguration(multimapCacheName, c.build()); // get the multimap cache multimapCache = multimapCacheManager.get(multimapCacheName);
13.4. 制限事項
ほとんどの場合、Multimap キャッシュは通常のキャッシュとして動作しますが、以下のように現在のバージョンにはいくつかの制限があります。
13.4.1. 重複のサポート
重複はまだサポートされていません。これは、マルチマップに重複したキーと値のペアが含まれていないことを意味します。put メソッドが呼び出されるたびに、キーと値のペアがすでに存在する場合、このキーと値のペアは追加されません。Multimap にキーと値のペアがすでに存在しているかどうかを確認するために使用されるメソッドは equals
および hashcode
です。
13.4.2. エビクション
現時点では、エビクションはキーと値のペアごとではなく、キーごとに機能します。これは、キーがエビクトされるたびに、キーに関連付けられているすべての値も削除されることを意味します。
13.4.3. トランザクション
暗黙的なトランザクションは、自動コミットによってサポートされ、すべてのメソッドは非ブロッキングです。ほとんどの場合、明示的なトランザクションはブロックせずに機能します。ブロックするメソッドは size
、containsEntry
、および remove(Predicate<? super V> p)
です。
第14章 Red Hat JBoss EAP のデータグリッドモジュール
Red Hat JBoss EAP にデプロイされたアプリケーション内で Data Grid を使用するには、以下を実行する Data Grid モジュールをインストールする必要があります。
- WAR または EAR ファイルに Data Grid JAR ファイルをパッケージ化せずにアプリケーションをデプロイできます。
- Red Hat JBoss EAP にバンドルされているバージョンとは独立した Data Grid を使用できるようにします。
Red Hat JBoss EAP(EAP)7.4 では、アプリケーションは Data Grid を個別にインストールすることなく、infinispan
サブシステムを直接処理できます。EAP 7.4 GA がリリースされた後では、Data Grid はダウンロード用の EAP モジュールを提供しなくなります。
14.1. Data Grid モジュールのインストール
Red Hat JBoss EAP の Data Grid モジュールをダウンロードしてインストールします。
前提条件
- JDK 8 以降。
- 既存の Red Hat JBoss EAP インストール
手順
- Red Hat カスタマーポータルにログインします。
- Data Grid ソフトウェアダウンロード からモジュールの ZIP アーカイブをダウンロードします。
ZIP アーカイブを抽出し、
modules
の内容を Red Hat JBoss EAP インストールのmodules
ディレクトリーにコピーして、結果の構造を取得します。$EAP_HOME/modules/system/add-ons/rhdg/org/infinispan/rhdg-8.2
14.2. Data Grid モジュールを使用するためのアプリケーションの設定
Red Hat JBoss EAP 用の DataGrid モジュールをインストールした後、Data Grid 機能を使用するようにアプリケーションを設定します。
手順
-
プロジェクトの
pom.xml
ファイルで、必要な Data Grid の依存関係を提供されているものとしてマークします。 -
適切な
MANIFEST.MF
ファイルを生成するようにアーティファクトアーカイバを設定します。
pom.xml
<dependencies> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-cachestore-jdbc</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-war-plugin</artifactId> <configuration> <archive> <manifestEntries> <Dependencies>org.infinispan:rhdg-8.2 services</Dependencies> </manifestEntries> </archive> </configuration> </plugin> </plugins> </build>
Data Grid 機能は、以下のようにアプリケーションのマニフェストにエントリーとして追加できる単一のモジュール org.infinispan
としてパッケージ化されます。
MANIFEST.MF
Manifest-Version: 1.0 Dependencies: org.infinispan:rhdg-8.2 services
AWS の依存関係
S3_PING などの AWS 依存関係が必要な場合は、アプリケーションのマニフェストに以下のモジュールを追加します。
Manifest-Version: 1.0 Dependencies: com.amazonaws.aws-java-sdk:rhdg-8.2 services
第15章 Red Hat JBoss Web Server から Red Hat Data Grid への HTTP セッションの外部化
org.apache.catalina.Manager
インターフェイスを使用して、HTTP セッションデータを JBoss Web Server デプロイメントから Data Grid Server クラスターに外部化して、高可用性を実現します。
15.1. Tomcat セッションクライアントのインストール
Tomcat セッションクライアントをインストールし、Red Hat JBoss Web Server アプリケーションから Red Hat Data Grid に HTTP セッションを外部化します。
手順
-
Data Grid Software Downloads から
redhat-datagrid-8.1.1-tomcat<$version>-session-client.zip
アーカイブをダウンロードします。 - アーカイブをローカルのファイルシステムに展開します。
-
展開したアーカイブから
$CATALINA_HOME/lib
にlib/
ディレクトリーの内容をコピーします。
15.2. セッションマネジャーの設定
セッションマネージャーの HotRodManager
クラスを設定し、Tomcat セッションクライアントが Red Hat Data Grid Server に接続し、データをリモートキャッシュに保存する方法を定義します。
前提条件
- Tomcat セッションクライアントをインストールします。
- 1 つ以上の Data Grid Server インスタンスをインストールします。
- HTTP セッションデータを保存するテンプレートとして使用する Data Grid Server にキャッシュを作成します。
手順
-
$CATALINA_HOME/conf/context.xml
または/WEB-INF/context.xml
を開いて編集します。 -
org.wildfly.clustering.tomcat.hotrod.HotRodManager
をclassName
プロパティーの値として指定します。 -
configurationName
プロパティーで、テンプレートとして使用するキャッシュの名前を指定します。 -
HotRodManager
クラスの他の設定プロパティーを必要に応じて定義します。 infinispan.client.hotrod.
接頭辞なしで Hot Rod クライアント設定プロパティーを設定します。-
server_list
プロパティーで Data Grid Server ノードの一覧を指定します。 -
auth_username
およびauth_password
プロパティーで Data Grid のクレデンシャルを指定します。
-
- 必要に応じて、Tomcat セッションマネージャーの一般的な属性を指定します。
-
context.xml
を保存して閉じます。
設定例
<Manager className="org.wildfly.clustering.tomcat.hotrod.HotRodManager" configurationName="mycache" persistenceStrategy="FINE" maxActiveSessions="100" server_list="192.0.2.0:11222;192.0.2.0:11223;192.0.2.0:11224" protocol_version="2.9" auth_username="admin" auth_password="changeme" auth_realm="default" sasl_mechanism="DIGEST-MD5" auth_server_name="infinispan"/>
検証
Tomcat セッションクライアントがリモートキャッシュにデータを保存することを確認するには、以下を行います。
任意のブラウザーで Data Grid コンソールを開きます。
デフォルトでは、コンソールは
http://127.0.0.1:11222/console/
で利用できます。- Tomcat セッションクライアントが、デプロイされたアプリケーションごとにキャッシュを作成することを確認します。
15.2.1. Hot Rod マネージャー設定プロパティー
以下の表は、HotRodManager
クラスの設定プロパティーをリストおよび説明しています。
プロパティー | 説明 |
---|---|
|
|
| HTTP セッションデータを保存するテンプレートとして使用する Data Grid Server のリモートキャッシュを指定します。 |
| セッションをキャッシュのエントリーにマップする方法を定義します。
|
| キャッシュに保存するセッションの最大数を定義します。デフォルトは最大値なし (制限なし) です。 |
第16章 カスタムインターセプター
カスタムインターセプターは Data Grid で非推奨となり、今後のバージョンで削除されます。
カスタムインターセプターは、キャッシュへの変更に影響を与えたり、それに応答したりできるようにすることで、Data Grid を拡張する方法です。要素が追加/削除/更新されること、またはトランザクションがコミットされることが、このような変更の例としてあげられます。
16.1. カスタムインターセプターの宣言的追加
カスタムのインターセプターは、名前付きキャッシュごとに追加できます。これは、名前の付いた各キャッシュに独自のインターセプタースタックがあるためです。以下の xml スニペットは、カスタムインターセプターを追加する方法を示しています。
<local-cache name="cacheWithCustomInterceptors"> <!-- Define custom interceptors. --> <!-- Custom interceptors should extend org.infinispan.interceptors.BaseCustomAsyncInterceptor --> <custom-interceptors> <interceptor position="FIRST" class="com.mycompany.CustomInterceptor1"> <property name="attributeOne">value1</property> <property name="attributeTwo">value2</property> </interceptor> <interceptor position="LAST" class="com.mycompany.CustomInterceptor2"/> <interceptor index="3" class="com.mycompany.CustomInterceptor1"/> <interceptor before="org.infinispanpan.interceptors.CallInterceptor" class="com.mycompany.CustomInterceptor2"/> <interceptor after="org.infinispanpan.interceptors.CallInterceptor" class="com.mycompany.CustomInterceptor1"/> </custom-interceptors> </local-cache>
16.2. プログラムによるカスタムインターセプターの追加
そのためには、AdvancedCache
への参照を取得する必要があります。これは、以下のように実行できます。
CacheManager cm = getCacheManager();//magic Cache aCache = cm.getCache("aName"); AdvancedCache advCache = aCache.getAdvancedCache();
次に、addInterceptor() メソッドの 1 つを使用して、実際のインターセプターを追加する必要があります。詳細は、AdvancedCache
javadoc を参照してください。
16.3. カスタムインターセプターの設計
カスタムインターセプターを作成するときは、次のルールに従う必要があります。
- カスタムインターセプターは、構築を有効にするために、パブリックの空のコンストラクターを宣言する必要があります。
- カスタムインターセプターには、XML 設定で使用されるプロパティータグで定義されたすべてのプロパティーに対するセッターがあります。