Chapter 6. Clustered Locks

Clustered locks are data structures that are distributed and shared across nodes in a Data Grid cluster. Clustered locks allow you to run code that is synchronized between nodes.

6.1. Lock API

Data Grid provides a ClusteredLock API that lets you concurrently execute code on a cluster when using Data Grid in embedded mode.

The API consists of the following:

  • ClusteredLock exposes methods to implement clustered locks.
  • ClusteredLockManager exposes methods to define, configure, retrieve, and remove clustered locks.
  • EmbeddedClusteredLockManagerFactory initializes ClusteredLockManager implementations.

Ownership

Data Grid supports NODE ownership so that all nodes in a cluster can use a lock.

Reentrancy

Data Grid clustered locks are non-reentrant so any node in the cluster can acquire a lock but only the node that creates the lock can release it.

If two consecutive lock calls are sent for the same owner, the first call acquires the lock if it is available and the second call is blocked.

6.2. Using Clustered Locks

Learn how to use clustered locks with Data Grid embedded in your application.

Prerequisites

  • Add the infinispan-clustered-lock dependency to your pom.xml:
<dependency>
   <groupId>org.infinispan</groupId>
   <artifactId>infinispan-clustered-lock</artifactId>
</dependency>

Procedure

  1. Initialize the ClusteredLockManager interface from a Cache Manager. This interface is the entry point for defining, retrieving, and removing clustered locks.
  2. Give a unique name for each clustered lock.
  3. Acquire locks with the lock.tryLock(1, TimeUnit.SECONDS) method.
// Set up a clustered Cache Manager.
GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();

// Configure the cache mode, in this case it is distributed and synchronous.
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.clustering().cacheMode(CacheMode.DIST_SYNC);

// Initialize a new default Cache Manager.
DefaultCacheManager cm = new DefaultCacheManager(global.build(), builder.build());

// Initialize a Clustered Lock Manager.
ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm);

// Define a clustered lock named 'lock'.
clm1.defineLock("lock");

// Get a lock from each node in the cluster.
ClusteredLock lock = clm1.get("lock");

AtomicInteger counter = new AtomicInteger(0);

// Acquire the lock as follows.
// Each 'lock.tryLock(1, TimeUnit.SECONDS)' method attempts to acquire the lock.
// If the lock is not available, the method waits for the timeout period to elapse. When the lock is acquired, other calls to acquire the lock are blocked until the lock is released.
CompletableFuture<Boolean> call1 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 1");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 1");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture<Boolean> call2 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 2");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 2");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture<Boolean> call3 = lock.tryLock(1, TimeUnit.SECONDS).whenComplete((r, ex) -> {
    if (r) {
        System.out.println("lock is acquired by the call 3");
        lock.unlock().whenComplete((nil, ex2) -> {
            System.out.println("lock is released by the call 3");
            counter.incrementAndGet();
        });
    }
});

CompletableFuture.allOf(call1, call2, call3).whenComplete((r, ex) -> {
    // Print the value of the counter.
    System.out.println("Value of the counter is " + counter.get());

    // Stop the Cache Manager.
    cm.stop();
});

6.3. Configuring Internal Caches for Locks

Clustered Lock Managers include an internal cache that stores lock state. You can configure the internal cache either declaratively or programmatically.

Procedure

  1. Define the number of nodes in the cluster that store the state of clustered locks. The default value is -1, which replicates the value to all nodes.
  2. Specify one of the following values for the cache reliability, which controls how clustered locks behave when clusters split into partitions or multiple nodes leave:

    • AVAILABLE: Nodes in any partition can concurrently operate on locks.
    • CONSISTENT: Only nodes that belong to the majority partition can operate on locks. This is the default value.
    • Programmatic configuration

      import org.infinispan.lock.configuration.ClusteredLockManagerConfiguration;
      import org.infinispan.lock.configuration.ClusteredLockManagerConfigurationBuilder;
      import org.infinispan.lock.configuration.Reliability;
      ...
      
      GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
      
      final ClusteredLockManagerConfiguration config = global.addModule(ClusteredLockManagerConfigurationBuilder.class).numOwner(2).reliability(Reliability.AVAILABLE).create();
      
      DefaultCacheManager cm = new DefaultCacheManager(global.build());
      
      ClusteredLockManager clm1 = EmbeddedClusteredLockManagerFactory.from(cm);
      
      clm1.defineLock("lock");
    • Declarative configuration

      <infinispan
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="urn:infinispan:config:14.0 https://infinispan.org/schemas/infinispan-config-14.0.xsd"
              xmlns="urn:infinispan:config:14.0">
      
          <cache-container default-cache="default">
              <transport/>
              <local-cache name="default">
                  <locking concurrency-level="100" acquire-timeout="1000"/>
              </local-cache>
              <clustered-locks xmlns="urn:infinispan:config:clustered-locks:14.0"
                               num-owners = "3"
                               reliability="AVAILABLE">
                  <clustered-lock name="lock1" />
                  <clustered-lock name="lock2" />
              </clustered-locks>
          </cache-container>
          <!-- Cache configuration goes here. -->
      </infinispan>