Chapter 4. Clustered Caches

Clustered caches store data across multiple Data Grid nodes using JGroups technology as the transport layer to pass data across the network.

4.1. Invalidation Mode

You can use Data Grid in invalidation mode to optimize systems that perform high volumes of read operations. A good example is to use invalidation to prevent lots of database writes when state changes occur.

This cache mode only makes sense if you have another, permanent store for your data such as a database and are only using Data Grid as an optimization in a read-heavy system, to prevent hitting the database for every read. If a cache is configured for invalidation, every time data is changed in a cache, other caches in the cluster receive a message informing them that their data is now stale and should be removed from memory and from any local store.

Figure 4.1. Invalidation mode

Sometimes the application reads a value from the external store and wants to write it to the local cache, without removing it from the other nodes. To do this, it must call Cache.putForExternalRead(key, value) instead of Cache.put(key, value).

Invalidation mode can be used with a shared cache store. A write operation will both update the shared store, and it would remove the stale values from the other nodes' memory. The benefit of this is twofold: network traffic is minimized as invalidation messages are very small compared to replicating the entire value, and also other caches in the cluster look up modified data in a lazy manner, only when needed.

Note

Never use invalidation mode with a local store. The invalidation message will not remove entries in the local store, and some nodes will keep seeing the stale value.

An invalidation cache can also be configured with a special cache loader, ClusterLoader. When ClusterLoader is enabled, read operations that do not find the key on the local node will request it from all the other nodes first, and store it in memory locally. In certain situation it will store stale values, so only use it if you have a high tolerance for stale values.

Invalidation mode can be synchronous or asynchronous. When synchronous, a write blocks until all nodes in the cluster have evicted the stale value. When asynchronous, the originator broadcasts invalidation messages but doesn’t wait for responses. That means other nodes still see the stale value for a while after the write completed on the originator.

Transactions can be used to batch the invalidation messages. Transactions acquire the key lock on the primary owner. To find more about how primary owners are assigned, please read the Key Ownership section.

  • With pessimistic locking, each write triggers a lock message, which is broadcast to all the nodes. During transaction commit, the originator broadcasts a one-phase prepare message (optionally fire-and-forget) which invalidates all affected keys and releases the locks.
  • With optimistic locking, the originator broadcasts a prepare message, a commit message, and an unlock message (optional). Either the one-phase prepare or the unlock message is fire-and-forget, and the last message always releases the locks.

4.2. Replicated Caches

Entries written to a replicated cache on any node will be replicated to all other nodes in the cluster, and can be retrieved locally from any node. Replicated mode provides a quick and easy way to share state across a cluster, however replication practically only performs well in small clusters (under 10 nodes), due to the number of messages needed for a write scaling linearly with the cluster size. Data Grid can be configured to use UDP multicast, which mitigates this problem to some degree.

Each key has a primary owner, which serializes data container updates in order to provide consistency. To find more about how primary owners are assigned, please read the Key Ownership section.

Figure 4.2. Replicated mode

Replicated mode can be synchronous or asynchronous.

  • Synchronous replication blocks the caller (e.g. on a cache.put(key, value)) until the modifications have been replicated successfully to all the nodes in the cluster.
  • Asynchronous replication performs replication in the background, and write operations return immediately. Asynchronous replication is not recommended, because communication errors, or errors that happen on remote nodes are not reported to the caller.

If transactions are enabled, write operations are not replicated through the primary owner.

  • With pessimistic locking, each write triggers a lock message, which is broadcast to all the nodes. During transaction commit, the originator broadcasts a one-phase prepare message and an unlock message (optional). Either the one-phase prepare or the unlock message is fire-and-forget.
  • With optimistic locking, the originator broadcasts a prepare message, a commit message, and an unlock message (optional). Again, either the one-phase prepare or the unlock message is fire-and-forget.

4.3. Distributed Caches

Distribution tries to keep a fixed number of copies of any entry in the cache, configured as numOwners. This allows the cache to scale linearly, storing more data as nodes are added to the cluster.

As nodes join and leave the cluster, there will be times when a key has more or less than numOwners copies. In particular, if numOwners nodes leave in quick succession, some entries will be lost, so we say that a distributed cache tolerates numOwners - 1 node failures.

The number of copies represents a trade-off between performance and durability of data. The more copies you maintain, the lower performance will be, but also the lower the risk of losing data due to server or network failures. Regardless of how many copies are maintained, distribution still scales linearly, and this is key to Data Grid’s scalability.

The owners of a key are split into one primary owner, which coordinates writes to the key, and zero or more backup owners. To find more about how primary and backup owners are assigned, please read the Key Ownership section.

Figure 4.3. Distributed mode

A read operation will request the value from the primary owner, but if it doesn’t respond in a reasonable amount of time, we request the value from the backup owners as well. (The infinispan.stagger.delay system property, in milliseconds, controls the delay between requests.) A read operation may require 0 messages if the key is present in the local cache, or up to 2 * numOwners messages if all the owners are slow.

A write operation will also result in at most 2 * numOwners messages: one message from the originator to the primary owner, numOwners - 1 messages from the primary to the backups, and the corresponding ACK messages.

Note

Cache topology changes may cause retries and additional messages, both for reads and for writes.

Just as replicated mode, distributed mode can also be synchronous or asynchronous. And as in replicated mode, asynchronous replication is not recommended because it can lose updates. In addition to losing updates, asynchronous distributed caches can also see a stale value when a thread writes to a key and then immediately reads the same key.

Transactional distributed caches use the same kinds of messages as transactional replicated caches, except lock/prepare/commit/unlock messages are sent only to the affected nodes (all the nodes that own at least one key affected by the transaction) instead of being broadcast to all the nodes in the cluster. As an optimization, if the transaction writes to a single key and the originator is the primary owner of the key, lock messages are not replicated.

4.3.1. Read consistency

Even with synchronous replication, distributed caches are not linearizable. (For transactional caches, we say they do not support serialization/snapshot isolation.) We can have one thread doing a single put:

cache.get(k) -> v1
cache.put(k, v2)
cache.get(k) -> v2

But another thread might see the values in a different order:

cache.get(k) -> v2
cache.get(k) -> v1

The reason is that read can return the value from any owner, depending on how fast the primary owner replies. The write is not atomic across all the owners — in fact, the primary commits the update only after it receives a confirmation from the backup. While the primary is waiting for the confirmation message from the backup, reads from the backup will see the new value, but reads from the primary will see the old one.

4.3.2. Key Ownership

Distributed caches split entries into a fixed number of segments and assign each segment to a list of owner nodes. Replicated caches do the same, with the exception that every node is an owner.

The first node in the list of owners is the primary owner. The other nodes in the list are backup owners. When the cache topology changes, because a node joins or leaves the cluster, the segment ownership table is broadcast to every node. This allows nodes to locate keys without making multicast requests or maintaining metadata for each key.

The numSegments property configures the number of segments available. However, the number of segments cannot change unless the cluster is restarted.

Likewise the key-to-segment mapping cannot change. Keys must always map to the same segments regardless of cluster topology changes. It is important that the key-to-segment mapping evenly distributes the number of segments allocated to each node while minimizing the number of segments that must move when the cluster topology changes.

You can customize the key-to-segment mapping by configuring a KeyPartitioner or by using the Grouping API.

However, Data Grid provides the following implementations:

SyncConsistentHashFactory

Uses an algorithm based on consistent hashing. Selected by default when server hinting is disabled.

This implementation always assigns keys to the same nodes in every cache as long as the cluster is symmetric. In other words, all caches run on all nodes. This implementation does have some negative points in that the load distribution is slightly uneven. It also moves more segments than strictly necessary on a join or leave.

TopologyAwareSyncConsistentHashFactory
Similar to SyncConsistentHashFactory, but adapted for Server Hinting. Selected by default when server hinting is enabled.
DefaultConsistentHashFactory

Achieves a more even distribution than SyncConsistentHashFactory, but with one disadvantage. The order in which nodes join the cluster determines which nodes own which segments. As a result, keys might be assigned to different nodes in different caches.

Was the default from version 5.2 to version 8.1 with server hinting disabled.

TopologyAwareConsistentHashFactory

Similar to DefaultConsistentHashFactory, but adapted for Server Hinting.

Was the default from version 5.2 to version 8.1 with server hinting enabled.

ReplicatedConsistentHashFactory
Used internally to implement replicated caches. You should never explicitly select this algorithm in a distributed cache.

4.3.2.1. Capacity Factors

Capacity factors allocate segment-to-node mappings based on resources available to nodes.

To configure capacity factors, you specify any non-negative number and the Data Grid hashing algorithm assigns each node a load weighted by its capacity factor (both as a primary owner and as a backup owner).

For example, nodeA has 2x the memory available than nodeB in the same Data Grid cluster. In this case, setting capacityFactor to a value of 2 configures Data Grid to allocate 2x the number of segments to nodeA.

Setting a capacity factor of 0 is possible but is recommended only in cases where nodes are not joined to the cluster long enough to be useful data owners.

4.3.3. Zero Capacity Node

You might need to configure a whole node where the capacity factor is 0 for every cache, user defined caches and internal caches. When defining a zero capacity node, the node won’t hold any data. This is how you declare a zero capacity node:

<cache-container zero-capacity-node="true" />
new GlobalConfigurationBuilder().zeroCapacityNode(true);

However, note that this will be true for distributed caches only. If you are using replicated caches, the node will still keep a copy of the value. Use only distributed caches to make the best use of this feature.

4.3.4. Hashing Configuration

This is how you configure hashing declaratively, via XML:

<distributed-cache name="distributedCache" owners="2" segments="100" capacity-factor="2" />

And this is how you can configure it programmatically, in Java:

Configuration c = new ConfigurationBuilder()
   .clustering()
      .cacheMode(CacheMode.DIST_SYNC)
      .hash()
         .numOwners(2)
         .numSegments(100)
         .capacityFactor(2)
   .build();

4.3.5. Initial cluster size

Data Grid’s very dynamic nature in handling topology changes (i.e. nodes being added / removed at runtime) means that, normally, a node doesn’t wait for the presence of other nodes before starting. While this is very flexible, it might not be suitable for applications which require a specific number of nodes to join the cluster before caches are started. For this reason, you can specify how many nodes should have joined the cluster before proceeding with cache initialization. To do this, use the initialClusterSize and initialClusterTimeout transport properties. The declarative XML configuration:

<transport initial-cluster-size="4" initial-cluster-timeout="30000" />

The programmatic Java configuration:

GlobalConfiguration global = new GlobalConfigurationBuilder()
   .transport()
   .initialClusterSize(4)
   .initialClusterTimeout(30000, TimeUnit.MILLISECONDS)
   .build();

The above configuration will wait for 4 nodes to join the cluster before initialization. If the initial nodes do not appear within the specified timeout, the cache manager will fail to start.

4.3.6. L1 Caching

When L1 is enabled, a node will keep the result of remote reads locally for a short period of time (configurable, 10 minutes by default), and repeated lookups will return the local L1 value instead of asking the owners again.

Figure 4.4. L1 caching

L1 caching is not free though. Enabling it comes at a cost, and this cost is that every entry update must broadcast an invalidation message to all the nodes. L1 entries can be evicted just like any other entry when the the cache is configured with a maximum size. Enabling L1 will improve performance for repeated reads of non-local keys, but it will slow down writes and it will increase memory consumption to some degree.

Is L1 caching right for you? The correct approach is to benchmark your application with and without L1 enabled and see what works best for your access pattern.

4.3.7. Server Hinting

The following topology hints can be specified:

Machine
This is probably the most useful, when multiple JVM instances run on the same node, or even when multiple virtual machines run on the same physical machine.
Rack
In larger clusters, nodes located on the same rack are more likely to experience a hardware or network failure at the same time.
Site
Some clusters may have nodes in multiple physical locations for extra resilience. Note that Cross site replication is another alternative for clusters that need to span two or more data centres.

All of the above are optional. When provided, the distribution algorithm will try to spread the ownership of each segment across as many sites, racks, and machines (in this order) as possible.

4.3.7.1. Configuration

The hints are configured at transport level:

<transport
    cluster="MyCluster"
    machine="LinuxServer01"
    rack="Rack01"
    site="US-WestCoast" />

4.3.8. Key affinity service

In a distributed cache, a key is allocated to a list of nodes with an opaque algorithm. There is no easy way to reverse the computation and generate a key that maps to a particular node. However, we can generate a sequence of (pseudo-)random keys, see what their primary owner is, and hand them out to the application when it needs a key mapping to a particular node.

4.3.8.1. API

Following code snippet depicts how a reference to this service can be obtained and used.

// 1. Obtain a reference to a cache
Cache cache = ...
Address address = cache.getCacheManager().getAddress();

// 2. Create the affinity service
KeyAffinityService keyAffinityService = KeyAffinityServiceFactory.newLocalKeyAffinityService(
      cache,
      new RndKeyGenerator(),
      Executors.newSingleThreadExecutor(),
      100);

// 3. Obtain a key for which the local node is the primary owner
Object localKey = keyAffinityService.getKeyForAddress(address);

// 4. Insert the key in the cache
cache.put(localKey, "yourValue");

The service is started at step 2: after this point it uses the supplied Executor to generate and queue keys. At step 3, we obtain a key from the service, and at step 4 we use it.

4.3.8.2. Lifecycle

KeyAffinityService extends Lifecycle, which allows stopping and (re)starting it:

public interface Lifecycle {
   void start();
   void stop();
}

The service is instantiated through KeyAffinityServiceFactory. All the factory methods have an Executor parameter, that is used for asynchronous key generation (so that it won’t happen in the caller’s thread). It is the user’s responsibility to handle the shutdown of this Executor.

The KeyAffinityService, once started, needs to be explicitly stopped. This stops the background key generation and releases other held resources.

The only situation in which KeyAffinityService stops by itself is when the cache manager with which it was registered is shutdown.

4.3.8.3. Topology changes

When the cache topology changes (i.e. nodes join or leave the cluster), the ownership of the keys generated by the KeyAffinityService might change. The key affinity service keep tracks of these topology changes and doesn’t return keys that would currently map to a different node, but it won’t do anything about keys generated earlier.

As such, applications should treat KeyAffinityService purely as an optimization, and they should not rely on the location of a generated key for correctness.

In particular, applications should not rely on keys generated by KeyAffinityService for the same address to always be located together. Collocation of keys is only provided by the Grouping API.

4.3.8.4. The Grouping API

Complementary to Key affinity service, the grouping API allows you to co-locate a group of entries on the same nodes, but without being able to select the actual nodes.

4.3.8.5. How does it work?

By default, the segment of a key is computed using the key’s hashCode(). If you use the grouping API, Data Grid will compute the segment of the group and use that as the segment of the key. See the Key Ownership section for more details on how segments are then mapped to nodes.

When the group API is in use, it is important that every node can still compute the owners of every key without contacting other nodes. For this reason, the group cannot be specified manually. The group can either be intrinsic to the entry (generated by the key class) or extrinsic (generated by an external function).

4.3.8.6. How do I use the grouping API?

First, you must enable groups. If you are configuring Data Grid programmatically, then call:

Configuration c = new ConfigurationBuilder()
   .clustering().hash().groups().enabled()
   .build();

Or, if you are using XML:

<distributed-cache>
   <groups enabled="true"/>
</distributed-cache>

If you have control of the key class (you can alter the class definition, it’s not part of an unmodifiable library), then we recommend using an intrinsic group. The intrinsic group is specified by adding the @Group annotation to a method. Let’s take a look at an example:

class User {
   ...
   String office;
   ...

   public int hashCode() {
      // Defines the hash for the key, normally used to determine location
      ...
   }

   // Override the location by specifying a group
   // All keys in the same group end up with the same owners
   @Group
   public String getOffice() {
      return office;
   }
   }
}
Note

The group method must return a String

If you don’t have control over the key class, or the determination of the group is an orthogonal concern to the key class, we recommend using an extrinsic group. An extrinsic group is specified by implementing the Grouper interface.

public interface Grouper<T> {
    String computeGroup(T key, String group);

    Class<T> getKeyType();
}

If multiple Grouper classes are configured for the same key type, all of them will be called, receiving the value computed by the previous one. If the key class also has a @Group annotation, the first Grouper will receive the group computed by the annotated method. This allows you even greater control over the group when using an intrinsic group. Let’s take a look at an example Grouper implementation:

public class KXGrouper implements Grouper<String> {

   // The pattern requires a String key, of length 2, where the first character is
   // "k" and the second character is a digit. We take that digit, and perform
   // modular arithmetic on it to assign it to group "0" or group "1".
   private static Pattern kPattern = Pattern.compile("(^k)(<a>\\d</a>)$");

   public String computeGroup(String key, String group) {
      Matcher matcher = kPattern.matcher(key);
      if (matcher.matches()) {
         String g = Integer.parseInt(matcher.group(2)) % 2 + "";
         return g;
      } else {
         return null;
      }
   }

   public Class<String> getKeyType() {
      return String.class;
   }
}

Grouper implementations must be registered explicitly in the cache configuration. If you are configuring Data Grid programmatically:

Configuration c = new ConfigurationBuilder()
   .clustering().hash().groups().enabled().addGrouper(new KXGrouper())
   .build();

Or, if you are using XML:

<distributed-cache>
   <groups enabled="true">
      <grouper class="com.acme.KXGrouper" />
   </groups>
</distributed-cache>

4.3.8.7. Advanced Interface

AdvancedCache has two group-specific methods:

getGroup(groupName)
Retrieves all keys in the cache that belong to a group.
removeGroup(groupName)
Removes all the keys in the cache that belong to a group.

Both methods iterate over the entire data container and store (if present), so they can be slow when a cache contains lots of small groups.

4.4. Asynchronous Communication with Clustered Caches

4.4.1. Asynchronous Communications

All clustered cache modes can be configured to use asynchronous communications with the mode="ASYNC" attribute on the <replicated-cache/>, <distributed-cache>, or <invalidation-cache/> element.

With asynchronous communications, the originator node does not receive any acknowledgement from the other nodes about the status of the operation, so there is no way to check if it succeeded on other nodes.

We do not recommend asynchronous communications in general, as they can cause inconsistencies in the data, and the results are hard to reason about. Nevertheless, sometimes speed is more important than consistency, and the option is available for those cases.

4.4.2. Asynchronous API

The Asynchronous API allows you to use synchronous communications, but without blocking the user thread.

There is one caveat: The asynchronous operations do NOT preserve the program order. If a thread calls cache.putAsync(k, v1); cache.putAsync(k, v2), the final value of k may be either v1 or v2. The advantage over using asynchronous communications is that the final value can’t be v1 on one node and v2 on another.

Note

Prior to version 9.0, the asynchronous API was emulated by borrowing a thread from an internal thread pool and running a blocking operation on that thread.

4.4.3. Return Values in Asynchronous Communication

Because the Cache interface extends java.util.Map, write methods like put(key, value) and remove(key) return the previous value by default.

In some cases, the return value may not be correct:

  1. When using AdvancedCache.withFlags() with Flag.IGNORE_RETURN_VALUE, Flag.SKIP_REMOTE_LOOKUP, or Flag.SKIP_CACHE_LOAD.
  2. When the cache is configured with unreliable-return-values="true".
  3. When using asynchronous communications.
  4. When there are multiple concurrent writes to the same key, and the cache topology changes. The topology change will make Data Grid retry the write operations, and a retried operation’s return value is not reliable.

Transactional caches return the correct previous value in cases 3 and 4. However, transactional caches also have a gotcha: in distributed mode, the read-committed isolation level is implemented as repeatable-read. That means this example of "double-checked locking" won’t work:

Cache cache = ...
TransactionManager tm = ...

tm.begin();
try {
   Integer v1 = cache.get(k);
   // Increment the value
   Integer v2 = cache.put(k, v1 + 1);
   if (Objects.equals(v1, v2) {
      // success
   } else {
      // retry
   }
} finally {
  tm.commit();
}

The correct way to implement this is to use cache.getAdvancedCache().withFlags(Flag.FORCE_WRITE_LOCK).get(k).

In caches with optimistic locking, writes can also return stale previous values. Write skew checks can avoid stale previous values.