Package org.infinispan.scattered

package org.infinispan.scattered



Distributed caches have fixed owners for each key. Operations where originator is one of the owners require less messages (and possibly less roundtrips), so let's make the originator always the owner. In case of node crash, we can retrieve the data by inspecting all nodes. To find the last-written entry in case of crashed primary owner, entries will keep write versions in metadata. These versions also determine order of writes; we don't have to use locking anymore.


  • faster writes
  • no locking contention


  • reads always have to go to the primary (slower writes in small clusters)
  • complex reconciliation during rebalance

Scope of implementation

  • Scattered cache is implemented as resilient to single node failure (equivalent to 2 owners distributed cache).
  • Transactional mode is not implemented yet.
  • Functional commands are not fully implemented yet.
  • All other features (state transfer, distributed streams, persistence...) should work.


We need to keep tombstones with versions after entry removal. These tombstones have limited lifespan - we keep them around only until the invalidations are applied on all nodes.

The versions have to grow monotonically; therefore the version counter won't be per-entry but per segment (as tombstone will be eventually removed, per-entry version would be lost). The version is implemented by SimpleClusteredVersion and therefore it contains topology id.

Unlike other cache modes, entry commit does not happen in EntryWrappingInterceptor but before replication to backup in ScatteredDistributionInterceptor (see below for the detailed operation descriptions). As scattered cache synchronizes only on the data container (instead of using locking interceptors), the value in data container can change between loading that in EntryWrappingInterceptor and committing it. Therefore, for command that reads previous values according to VisitableCommand.loadType() the version seen before modification is checked against actual data-container value and if it does not match, ConcurrentChangeException is thrown. This is caught in RetryingEntryWrappingInterceptor and the command is retried in that case.

Single entry write (put, getAndPut, putIfAbsent, replace...)

originator == primary owner

  1. Primary increments version for segment
  2. Primary commits entry
  3. Primary picks one node (next member in CH) and sends backup RPC
  4. Backup commits entry
  5. Backup sends RPC response
  6. Primary returns after it gets the response
  7. Primary schedules invalidation of entry with lower versions

Selection of backup could be random, but having it ~fixed probably reduces overall memory consumption

Updating value on primary before backup finishes does not change data consistency - if backup RPC fails in distributed cache we can't know whether backup has committed the entry and so it can be published anyway.

originator != primary owner

  1. Origin sends sync RPC to primary owner
  2. Primary increments version for segment
  3. Primary commits entry
  4. Primary returns response with version (+ return value if appropriate)
  5. Origin commits entry
  6. Origin schedules invalidation of entry with lower versions

Invalidation must be scheduled by origin, because primary does not know if backup committed

Single entry read

originator == primary owner

Just local read

originator != primary owner

ATM just invoke sync RPC to the primary owner

Possible improvement (not implemented yet)

  1. Origin locally loads entry with SKIP_CACHE_LOAD
  2. Origin sends sync RPC including the version to primary
  3. Primary compares version with it's own
    1. If version matches, origin gets just successful response and returns locally-loaded value
    2. If version does not match, value + version is sent back
Optional configuration options:
  • Allow reading local values only (if present) - risk of stale reads
  • Store read value locally with expiration (L1 enabled) - as invalidations are broadcast anyway, there's not much overhead with that. This will still require RPC on read (unless stale reads are allowed) but not marshalling the value.

Multiple entries writes

  1. Increment version for primary-owned entries and commit them
  2. Backup these entries to next node
  3. Send all other entries to their primary owner
  4. Commit entries after successful response from primary
Possible improvement (not implemented yet):

Entries for which this node is the primary owner won't be backed up just to the next member, but to a node that is primary owner of another entries in the multiwrite. That way some messages can be spared by merging the primary(keyA) -> backup and origin -> primary(keyB) requests.

Multiple entries reads

Same as single entry reads, just merge RPCs for the same primary owners.


It would be inefficient to send invalidations (key + version) one-by-one, so these are be merged and sent in batches, using InvalidateVersionsCommand.

Possible improvement (not implemented yet):

The list of invalidations-to-be-sent could be updated when we get invalidation from another node, in order to reduce the overall noise.

State Transfer

During rebalance, scattered cache always uses pendinCH for both reads and writes. It does not implement four-phase rebalance as the segment state and ability to read/write on a node is tracked in ScatteredVersionManager, we use only two-phase rebalance.

When the command traverses through interceptor stack PrefetchInterceptor checks the segment state, and either retrieves the remove value (ahead of regular state transfer) or blocks the command until the state transfer is finished (for commands which need all values - there's no need to start a second retrieval of all values).

The state transfer of a segment has several phases:

  1. NOT_OWNED: this node is not primary owner, it can backup the entry, though
  2. BLOCKED: node has just become an owner but the old owner did not revoke segment ownership yet
  3. KEYS_TRANSFER: node knows what is the highest version for given segment and is requesting keys + versions (no values) from all other nodes.
  4. VALUES_TRANSFER: we got all keys with metadata and now store the highest version of each key and the node storing the value in RemoteMetadata
  5. OWNED: all data is in
There are new commands:
  • ScatteredStateConfirmRevokedCommand that makes sure that all old owners have adopted the new topology and won't serve furher requests according to the old one.
  • ScatteredStateGetKeysCommand that is very similar to StateTransferStartCommand but moves only keys.

During node crash, we experience 3 topologies:

  1. CH_UPDATE just removing the dead member (STABLE topology)
  2. REBALANCE starts shuffling entries around (TRANSITORY topology)
  3. CH_UPDATE with final (STABLE) topology

Operations are always driven by the new primary owner of given segment.

If the segment has not changed an owner:

ScatteredStateProvider does:
  1. Replicate all data from this segment to the next node using OutboundTransferTask.pushTransfer true
  2. Send InvalidateVersionsCommands with all keys in this segment to all nodes but the next member (receiving state via the push transfer)
Write to entry can proceed in parallel with this process; invalidation cannot overwrite newer entry, though invalidation from executed write can arrive to the new backup before the state-transfer - then the cluster would have 3 copies of that entry until next write as the entry would not be invalidated on backup.

If the segment just got a new primary owner:

ScatteredStateConsumerImpl does:
  1. Synchronously retrieve highest version for this segment from all nodes (using GET_MAX_VERSIONS
  2. Request all nodes to send you all keys + versions from this segment (and do that locally as well)
  3. Retrieve values from nodes with highest versions
  4. Send invalidations to all other nodes, removing the entry

Clean rebalance (after join, no data is lost)

Optimization for rebalance when there's single owner with all data (previous primary) has not been implemented yet.

Partition handling

Partition becomes degraded any time it loses more than one node compared to members in last stable topology. In degraded mode, all operations are prohibited; one partition cannot have all owners in (in that case operations are allowed in distributed caches) because we don't know who is the backup owner. Having primary owner is not sufficient; the other partition may be still available and therefore we would get inconsistent/provide possibly stale data.


As we don't use locking for everything after EntryWrappingInterceptor we need another synchronization for storing an entry into cache store. We don't want to block data-container for the potentially long cache store update, and therefore ScatteredCacheWriterInterceptor goes into data-container (getting the lock) just to compare versions and create a CompletableFuture that serves as a lock that can be waited upon in non-blocking way.

Potential problems


  • The pre- listeners may be invoked multiple times, with stale values (the command then does not update DC, and retries).
  • However if command itself does not read the value, it can commit even if the value changed in between and listener will get out-of-date value.
  • As ordering updates to DC is based on the versions, it is possible that some operations arrive to DC finding that a newer (according to version) update has been applied there. In that case, the operation correctly finishes, but an event for this update is not fired as we don't have the previous value, and the event that was fired for the newer update carries the value before this update.