Package org.infinispan.scattered
SCATTERED CACHE DESIGN
Idea
Distributed caches have fixed owners for each key. Operations where originator is one of the owners require less messages (and possibly less roundtrips), so let's make the originator always the owner. In case of node crash, we can retrieve the data by inspecting all nodes. To find the last-written entry in case of crashed primary owner, entries will keep write versions in metadata. These versions also determine order of writes; we don't have to use locking anymore.Pros:
- faster writes
- no locking contention
Cons:
- reads always have to go to the primary (slower writes in small clusters)
- complex reconciliation during rebalance
Scope of implementation
- Scattered cache is implemented as resilient to single node failure (equivalent to 2 owners distributed cache).
- Transactional mode is not implemented yet.
- Functional commands are not fully implemented yet.
- All other features (state transfer, distributed streams, persistence...) should work.
Operations
We need to keep tombstones with versions after entry removal. These tombstones have limited lifespan - we keep them around only until the invalidations are applied on all nodes.
The versions have to grow monotonically; therefore the version counter won't be per-entry but per segment
(as tombstone will be eventually removed, per-entry version would be lost). The version is implemented by
SimpleClusteredVersion
and therefore it contains topology id.
Unlike other cache modes, entry commit does not happen in EntryWrappingInterceptor
but before replication to backup in ScatteredDistributionInterceptor
(see below for the detailed operation descriptions). As scattered cache synchronizes only on the data container
(instead of using locking interceptors), the value in data container can change between loading that in
EntryWrappingInterceptor
and committing it. Therefore, for command
that reads previous values according to VisitableCommand.loadType()
the version seen before modification is checked against actual data-container value and if it does not match,
ConcurrentChangeException
is thrown. This is caught in
RetryingEntryWrappingInterceptor
and the command is retried in that case.
Single entry write (put, getAndPut, putIfAbsent, replace...)
originator == primary owner
- Primary increments version for segment
- Primary commits entry
- Primary picks one node (next member in CH) and sends backup RPC
- Backup commits entry
- Backup sends RPC response
- Primary returns after it gets the response
- Primary schedules invalidation of entry with lower versions
Selection of backup could be random, but having it ~fixed probably reduces overall memory consumption
Updating value on primary before backup finishes does not change data consistency - if backup RPC fails in distributed cache we can't know whether backup has committed the entry and so it can be published anyway.
originator != primary owner
- Origin sends sync RPC to primary owner
- Primary increments version for segment
- Primary commits entry
- Primary returns response with version (+ return value if appropriate)
- Origin commits entry
- Origin schedules invalidation of entry with lower versions
Invalidation must be scheduled by origin, because primary does not know if backup committed
Single entry read
originator == primary owner
Just local readoriginator != primary owner
ATM just invoke sync RPC to the primary ownerPossible improvement (not implemented yet)
- Origin locally loads entry with SKIP_CACHE_LOAD
- Origin sends sync RPC including the version to primary
- Primary compares version with it's own
- If version matches, origin gets just successful response and returns locally-loaded value
- If version does not match, value + version is sent back
- Allow reading local values only (if present) - risk of stale reads
- Store read value locally with expiration (L1 enabled) - as invalidations are broadcast anyway, there's not much overhead with that. This will still require RPC on read (unless stale reads are allowed) but not marshalling the value.
Multiple entries writes
- Increment version for primary-owned entries and commit them
- Backup these entries to next node
- Send all other entries to their primary owner
- Commit entries after successful response from primary
Entries for which this node is the primary owner won't be backed up just to the next member, but to a node that is primary owner of another entries in the multiwrite. That way some messages can be spared by merging the primary(keyA) -> backup and origin -> primary(keyB) requests.
Multiple entries reads
Same as single entry reads, just merge RPCs for the same primary owners.Invalidations
It would be inefficient to send invalidations (key + version) one-by-one, so these are be merged and sent in batches, usingInvalidateVersionsCommand
.
Possible improvement (not implemented yet):
The list of invalidations-to-be-sent could be updated when we get invalidation from another node, in order to reduce the overall noise.
State Transfer
During rebalance, scattered cache always uses pendinCH for both reads and writes. It does not implement four-phase rebalance as the segment state and ability to read/write on a node is tracked inScatteredVersionManager
,
we use only two-phase rebalance.
When the command traverses through interceptor stack PrefetchInterceptor
checks the segment state, and either retrieves the remove value (ahead of regular state transfer) or blocks
the command until the state transfer is finished (for commands which need all values - there's no need to start
a second retrieval of all values).
The state transfer of a segment has several phases:
- NOT_OWNED: this node is not primary owner, it can backup the entry, though
- BLOCKED: node has just become an owner but the old owner did not revoke segment ownership yet
- KEYS_TRANSFER: node knows what is the highest version for given segment and is requesting keys + versions (no values) from all other nodes.
- VALUES_TRANSFER: we got all keys with metadata and now store the highest version of each key
and the node storing the value in
RemoteMetadata
- OWNED: all data is in
ScatteredStateConfirmRevokedCommand
that makes sure that all old owners have adopted the new topology and won't serve furher requests according to the old one.ScatteredStateGetKeysCommand
that is very similar toStateTransferStartCommand
but moves only keys.
During node crash, we experience 3 topologies:
- CH_UPDATE just removing the dead member (STABLE topology)
- REBALANCE starts shuffling entries around (TRANSITORY topology)
- CH_UPDATE with final (STABLE) topology
Operations are always driven by the new primary owner of given segment.
If the segment has not changed an owner:
ScatteredStateProvider
does:
- Replicate all data from this segment to the next node using
OutboundTransferTask.pushTransfer
true
- Send
InvalidateVersionsCommand
s with all keys in this segment to all nodes but the next member (receiving state via the push transfer)
If the segment just got a new primary owner:
ScatteredStateConsumerImpl
does:
- Synchronously retrieve highest version for this segment from all nodes (using
GET_MAX_VERSIONS
- Request all nodes to send you all keys + versions from this segment (and do that locally as well)
- Retrieve values from nodes with highest versions
- Send invalidations to all other nodes, removing the entry
Clean rebalance (after join, no data is lost)
Optimization for rebalance when there's single owner with all data (previous primary) has not been implemented yet.Partition handling
Partition becomes degraded any time it loses more than one node compared to members in last stable topology. In degraded mode, all operations are prohibited; one partition cannot have all owners in (in that case operations are allowed in distributed caches) because we don't know who is the backup owner. Having primary owner is not sufficient; the other partition may be still available and therefore we would get inconsistent/provide possibly stale data.Persistence
As we don't use locking for everything afterEntryWrappingInterceptor
we need another synchronization for storing an entry into cache store. We don't want to block data-container
for the potentially long cache store update, and therefore ScatteredCacheWriterInterceptor
goes into data-container (getting the lock) just to compare versions and create a CompletableFuture
that serves as a lock that can be waited upon in non-blocking way.
Potential problems
Listeners
- The pre- listeners may be invoked multiple times, with stale values (the command then does not update DC, and retries).
- However if command itself does not read the value, it can commit even if the value changed in between and listener will get out-of-date value.
- As ordering updates to DC is based on the versions, it is possible that some operations arrive to DC finding that a newer (according to version) update has been applied there. In that case, the operation correctly finishes, but an event for this update is not fired as we don't have the previous value, and the event that was fired for the newer update carries the value before this update.
-
ClassDescriptionThis component tracks if this node can read the data stored locally despite not being an owner and which nodes will read the local data that is primary-owned by this node.Manages versions of entries and states of segments.