Class NonBlockingSoftIndexFileStore<K,V>
- java.lang.Object
-
- org.infinispan.persistence.sifs.NonBlockingSoftIndexFileStore<K,V>
-
- All Implemented Interfaces:
NonBlockingStore<K,V>
public class NonBlockingSoftIndexFileStore<K,V> extends Object implements NonBlockingStore<K,V>
Local file-based cache store, optimized for write-through use with strong consistency guarantees (ability to flush disk operations before returning from the store call). * DESIGN: There are three threads operating in the cache-store: - LogAppender: Requests to store entries are passed to the LogAppender thread via queue, then the requestor threads wait until LogAppender notifies them about successful store. LogAppender serializes the writes into append-only file, writes the offset into TemporaryTable and enqueues request to update index into UpdateQueue. The append-only files have limited size, when the file is full, new file is started. - IndexUpdater: Reads the UpdateQueue, applies the operation into B-tree-like structure Index (exact description below) and then removes the entry from TemporaryTable. When the Index is overwriten, the current entry offset is retrieved and IndexUpdater increases the unused space statistics in FileStats. - Compactor: When a limit of unused space in some file is reached (according to FileStats), the Compactor starts reading this file sequentially, querying TemporaryTable or Index for the current entry position and copying the unchanged entries into another file. For the entries that are still valid in the original file, a compare-and-set (file-offset based) request is enqueued into UpdateQueue - therefore this operation cannot interfere with concurrent writes overwriting the entry. Multiple files can be merged into single file during compaction. Structures: - TemporaryTable: keeps the records about current entry location until this is applied to the Index. Each read request goes to the TemporaryTable, if the key is not found here, Index is queried. - UpdateQueue: bounded queue (to prevent grow the TemporaryTable too much) of either forced writes (used for regular stores) or compare-and-set writes (used by Compactor). - FileStats: simple (Concurrent)HashTable with actual file size and amount of unused space for each file. - Index: B+-tree of IndexNodes. The tree is dropped and built a new if the process crashes, it does not need to flush disk operations. On disk it is kept as single random-accessed file, with free blocks list stored in memory. As IndexUpdater may easily become a bottleneck under heavy load, the IndexUpdater thread, UpdateQueue and tree of IndexNodes may be multiplied several times - the Index is divided into Segments. Each segment owns keys according to the hashCode() of the key. Amount of entries in IndexNode is limited by the size it occupies on disk. This size is limited by configurable nodeSize (4096 bytes by default?), only in case that the node contains single pivot (too long) it can be longer. A key_prefix common for all keys in the IndexNode is stored in order to reduce space requirements. For implementation reasons the keys are limited to 32kB - this requirement may be circumvented later. The pivots are not whole keys - it is the shortest part of key that is greater than all left children (but lesser or equal to all right children) - let us call this key_part. The key_parts are sorted in the IndexNode, naturally. On disk it has this format: key_prefix_length(2 bytes), key_prefix, num_parts(2 bytes), ( key_part_length (2 bytes), key_part, left_child_index_node_offset (8 bytes))+, right_child_index_node_offset (8 bytes) In memory, for every child a SoftReferenceis held. When this reference is empty (but the offset in file is set), any reader may load the reference using double-locking pattern (synchronized over the reference itself). The entry is never loaded by multiple threads in parallel and even may block other threads trying to read this node. For each node in memory a RW-lock is held. When the IndexUpdater thread updates the Index (modifying some IndexNodes), it prepares a copy of these nodes (already stored into index file). Then, in locks only the uppermost node for writing, overwrites the references to new data and unlocks the this node. After that the changed nodes are traversed from top down, write locked and their record in index file is released. Reader threads crawl the tree from top down, locking the parent node (for reading), locking child node and unlocking parent node. - Author:
- Radim Vansa <rvansa@redhat.com>
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
NonBlockingSoftIndexFileStore.EntryCreator<K,V>
-
Nested classes/interfaces inherited from interface org.infinispan.persistence.spi.NonBlockingStore
NonBlockingStore.Characteristic, NonBlockingStore.SegmentedPublisher<Type>
-
-
Field Summary
Fields Modifier and Type Field Description static String
PREFIX_10_1
static String
PREFIX_11_0
static String
PREFIX_12_0
static String
PREFIX_LATEST
-
Fields inherited from interface org.infinispan.persistence.spi.NonBlockingStore
SIZE_UNAVAILABLE_FUTURE
-
-
Constructor Summary
Constructors Constructor Description NonBlockingSoftIndexFileStore()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description CompletionStage<Void>
addSegments(IntSet segments)
Invoked when a node becomes an owner of the given segments.CompletionStage<Long>
approximateSize(IntSet segments)
Returns an estimation of the amount of entries that map to the given segments in the store.Set<NonBlockingStore.Characteristic>
characteristics()
Returns a set of characteristics for this store and its elements.CompletionStage<Void>
clear()
Clears all entries from the store.CompletionStage<Boolean>
containsKey(int segment, Object key)
Returns a stage that will contain whether the value can be found in the store.CompletionStage<Boolean>
delete(int segment, Object key)
Removes the entry for given key and segment from the store and optionally report if the entry was actually removed or not.protected Path
getIndexLocation()
CompletionStage<Boolean>
isAvailable()
Returns a stage that, when complete, returns a boolean indicating whether the current store can be accessed for requests.protected boolean
isIndexLoaded()
protected boolean
isSeqIdOld(long seqId, int segment, Object key, byte[] serializedKey)
CompletionStage<MarshallableEntry<K,V>>
load(int segment, Object key)
Returns a stage that will contain the value loaded from the store.org.reactivestreams.Publisher<MarshallableEntry<K,V>>
publishEntries(IntSet segments, Predicate<? super K> filter, boolean includeValues)
Publishes entries from this store that are in one of the provided segments and also pass the provided filter.org.reactivestreams.Publisher<K>
publishKeys(IntSet segments, Predicate<? super K> filter)
Publishes keys from this store that are in one of the provided segments and also pass the provided filter.org.reactivestreams.Publisher<MarshallableEntry<K,V>>
purgeExpired()
Returns a Publisher that, after it is subscribed to, removes any expired entries from the store and publishes them to the returned Publisher.CompletionStage<Void>
removeSegments(IntSet segments)
Invoked when a node loses ownership of the given segments.CompletionStage<Long>
size(IntSet segments)
Returns the amount of entries that map to the given segments in the store.CompletionStage<Void>
start(InitializationContext ctx)
The first method to invoke so that the store can be configured and additional steps, such as connecting through a socket or opening file descriptors, are performed.protected void
startIndex()
CompletionStage<Void>
stop()
This method is invoked when the cache is being shutdown.CompletionStage<Void>
write(int segment, MarshallableEntry<? extends K,? extends V> entry)
Writes the entry to the store for the given segment returning a stage that completes normally when it is finished.-
Methods inherited from interface org.infinispan.persistence.spi.NonBlockingStore
batch, commit, destroy, ignoreCommandWithFlags, prepareWithModifications, rollback
-
-
-
-
Field Detail
-
PREFIX_10_1
public static final String PREFIX_10_1
- See Also:
- Constant Field Values
-
PREFIX_11_0
public static final String PREFIX_11_0
- See Also:
- Constant Field Values
-
PREFIX_12_0
public static final String PREFIX_12_0
- See Also:
- Constant Field Values
-
PREFIX_LATEST
public static final String PREFIX_LATEST
- See Also:
- Constant Field Values
-
-
Method Detail
-
characteristics
public Set<NonBlockingStore.Characteristic> characteristics()
Description copied from interface:NonBlockingStore
Returns a set of characteristics for this store and its elements. This method may be invoked multiple times to determine which methods of the store can be used and how the data in the store can be handled.Refer to
NonBlockingStore.Characteristic
and its values for descriptions of each characteristic for stores.- Specified by:
characteristics
in interfaceNonBlockingStore<K,V>
- Returns:
- the set of characteristics that this store supports.
-
addSegments
public CompletionStage<Void> addSegments(IntSet segments)
Description copied from interface:NonBlockingStore
Invoked when a node becomes an owner of the given segments. Some store implementations may require initializing additional resources when a new segment is required. For example a store could store entries in a different file per segment.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.SHAREABLE
If the store has this characteristic and is configured to be StoreConfiguration.shared()
, this method will never be invoked.NonBlockingStore.Characteristic.SEGMENTABLE
This method is invoked only if the store has this characteristic and is configured to be segmented
.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
addSegments
in interfaceNonBlockingStore<K,V>
- Parameters:
segments
- the segments to add.- Returns:
- a stage that, when complete, indicates that the segments have been added.
-
removeSegments
public CompletionStage<Void> removeSegments(IntSet segments)
Description copied from interface:NonBlockingStore
Invoked when a node loses ownership of the given segments. A store must then remove any entries that map to the given segments and can remove any resources related to the given segments. For example, a database store can delete rows of the given segment or a file-based store can delete files related to the given segments.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.SHAREABLE
If the store has this characteristic and is configured to be shared
, this method will never be invoked.NonBlockingStore.Characteristic.SEGMENTABLE
This method is invoked only if the store has this characteristic and is configured to be segmented
.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
removeSegments
in interfaceNonBlockingStore<K,V>
- Parameters:
segments
- the segments to remove.- Returns:
- a stage that, when complete, indicates that the segments have been removed.
-
start
public CompletionStage<Void> start(InitializationContext ctx)
Description copied from interface:NonBlockingStore
The first method to invoke so that the store can be configured and additional steps, such as connecting through a socket or opening file descriptors, are performed.The provided
InitializationContext
contains many helpful objects, including the configuration of the cache and store, concurrency utilities such asBlockingManager
or an executor reserved for non-blocking operations onlyInitializationContext.getNonBlockingExecutor()
.This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking
start
a second time ifNonBlockingStore.stop()
has been invoked and allowed for its stage to complete.- Specified by:
start
in interfaceNonBlockingStore<K,V>
- Parameters:
ctx
- initialization context used to initialize this store.- Returns:
- a stage that, when complete, indicates that this store has started successfully.
-
purgeExpired
public org.reactivestreams.Publisher<MarshallableEntry<K,V>> purgeExpired()
Description copied from interface:NonBlockingStore
Returns a Publisher that, after it is subscribed to, removes any expired entries from the store and publishes them to the returned Publisher.When the Publisher is subscribed to, it is expected to do point-in-time expiration and should not return a Publisher that has infinite entries or never completes.
Subscribing to the returned
Publisher
should not block the invoking thread. It is the responsibility of the store implementation to ensure this occurs. If however the store must block to perform an operation it is recommended to wrap your Publisher before returning with theBlockingManager.blockingPublisher(Publisher)
method and it will handle subscription and observation on the blocking and non blocking executors respectively.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.EXPIRATION
This method is only invoked if the store has this characteristic. If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
purgeExpired
in interfaceNonBlockingStore<K,V>
- Returns:
- a Publisher that publishes the entries that are expired at the time of subscription.
-
getIndexLocation
protected Path getIndexLocation()
-
isSeqIdOld
protected boolean isSeqIdOld(long seqId, int segment, Object key, byte[] serializedKey) throws IOException
- Throws:
IOException
-
startIndex
protected void startIndex()
-
isIndexLoaded
protected boolean isIndexLoaded()
-
stop
public CompletionStage<Void> stop()
Description copied from interface:NonBlockingStore
This method is invoked when the cache is being shutdown. It is expected that all resources related to the store are freed when the returned stage is complete.This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking
NonBlockingStore.start(InitializationContext)
a second time ifstop
has been invoked and allowed for its stage to complete.- Specified by:
stop
in interfaceNonBlockingStore<K,V>
- Returns:
- a stage that, when complete, indicates that this store has stopped.
-
isAvailable
public CompletionStage<Boolean> isAvailable()
Description copied from interface:NonBlockingStore
Returns a stage that, when complete, returns a boolean indicating whether the current store can be accessed for requests. This can be useful for store implementations that rely on an external source, such as a remote database, that may become unreachable. This can reduce sending requests to a store that is not available, as subsequent cache requests will result in aStoreUnavailableException
being thrown until the store becomes available again.Store availability is is polled periodically to update the status of stores if their availability changes. This method is not invoked concurrently with itself. In other words, this method is not invoked until after the previous stage has completed. However this method is invoked concurrently with other operations, except for
NonBlockingStore.start(InitializationContext)
andNonBlockingStore.stop()
.If a store is configured to be
StoreConfiguration.async()
and the store becomes unavailable, then it is possible for the cache operations to be accepted in the interim period between the loss of availability and the modification-queue becoming full. This allows for this store to be unavailable for short periods of time without aStoreUnavailableException
being thrown; however if the store does not become available before the queue fills, then aStoreUnavailableException
is thrown.- Specified by:
isAvailable
in interfaceNonBlockingStore<K,V>
- Returns:
- stage that, when complete, indicates if the store is available.
-
clear
public CompletionStage<Void> clear()
Description copied from interface:NonBlockingStore
Clears all entries from the store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.READ_ONLY
This method will never be invoked. If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
clear
in interfaceNonBlockingStore<K,V>
- Returns:
- a stage that, when complete, indicates that the store has been cleared.
-
size
public CompletionStage<Long> size(IntSet segments)
Description copied from interface:NonBlockingStore
Returns the amount of entries that map to the given segments in the store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegments
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
size
in interfaceNonBlockingStore<K,V>
- Parameters:
segments
- the segments for which the entries are counted.- Returns:
- a stage that, when complete, contains the count of how many entries are present for the given segments.
-
approximateSize
public CompletionStage<Long> approximateSize(IntSet segments)
Description copied from interface:NonBlockingStore
Returns an estimation of the amount of entries that map to the given segments in the store. This is similar toNonBlockingStore.size(IntSet)
except that it is not strict about the returned size. For instance, this method might ignore if an entry is expired or if the store has some underlying optimizations to eventually have a consistent size.The implementations should be O(1). If a size approximation cannot be returned without iterating over all the entries in the store, the implementation should return
-1L
.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.SEGMENTABLE
When the store does not have this characteristic or segmentation is disabled in the configuration
, thesegment
parameter is alwaysIntSets.immutableRangeSet(numSegments)
.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
approximateSize
in interfaceNonBlockingStore<K,V>
- Parameters:
segments
- the segments for which the entries are counted.- Returns:
- a stage that, when complete, contains the approximate count of the entries in the given segments,
or
-1L
if an approximate count cannot be provided.
-
write
public CompletionStage<Void> write(int segment, MarshallableEntry<? extends K,? extends V> entry)
Description copied from interface:NonBlockingStore
Writes the entry to the store for the given segment returning a stage that completes normally when it is finished.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.READ_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.EXPIRATION
When set, this method must store the expiration metadata. NonBlockingStore.Characteristic.SEGMENTABLE
When set and segmentation is not disabled in the configuration
, this method must ensure the segment is stored with the entry.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
write
in interfaceNonBlockingStore<K,V>
- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.entry
- the entry to persist to the store.- Returns:
- a stage that when complete indicates that the store has written the value.
-
delete
public CompletionStage<Boolean> delete(int segment, Object key)
Description copied from interface:NonBlockingStore
Removes the entry for given key and segment from the store and optionally report if the entry was actually removed or not.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.READ_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
delete
in interfaceNonBlockingStore<K,V>
- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to delete from the store.- Returns:
- a stage that completes with
TRUE
if the key existed in the store,FALSE
if the key did not exist in the store, ornull
if the store does not report this information.
-
containsKey
public CompletionStage<Boolean> containsKey(int segment, Object key)
Description copied from interface:NonBlockingStore
Returns a stage that will contain whether the value can be found in the store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.WRITE_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.EXPIRATION
When set this method must not return true if the entry was expired. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
containsKey
in interfaceNonBlockingStore<K,V>
- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to check.- Returns:
- a stage that, when complete, contains a boolean stating if the value is contained in the store.
-
load
public CompletionStage<MarshallableEntry<K,V>> load(int segment, Object key)
Description copied from interface:NonBlockingStore
Returns a stage that will contain the value loaded from the store. If aMarshallableEntry
needs to be created here,InitializationContext.getMarshallableEntryFactory()
()} andInitializationContext.getByteBufferFactory()
should be used.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.WRITE_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.EXPIRATION
When set this method must not return expired entries. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Specified by:
load
in interfaceNonBlockingStore<K,V>
- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to load.- Returns:
- a stage that, when complete, contains the store value or null if not present.
-
publishKeys
public org.reactivestreams.Publisher<K> publishKeys(IntSet segments, Predicate<? super K> filter)
Description copied from interface:NonBlockingStore
Publishes keys from this store that are in one of the provided segments and also pass the provided filter. The returned publisher must support being subscribed to any number of times. That is subsequent invocations ofPublisher.subscribe(Subscriber)
should provide independent views of the underlying keys to the Subscribers. Keys should not retrieved until a given Subscriber requests them via theSubscription.request(long)
method.Subscribing to the returned
Publisher
should not block the invoking thread. It is the responsibility of the store implementation to ensure this occurs. If however the store must block to perform an operation it is recommended to wrap your Publisher before returning with theBlockingManager.blockingPublisher(Publisher)
method and it will handle subscription and observation on the blocking and non blocking executors respectively.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.EXPIRATION
When set the returned publisher must not return expired keys. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.- Specified by:
publishKeys
in interfaceNonBlockingStore<K,V>
- Parameters:
segments
- a set of segments to filter keys by. This will always be non null.filter
- a filter to filter they keys by. If this is null then no additional filtering should be done after segments.- Returns:
- a publisher that provides the keys from the store.
-
publishEntries
public org.reactivestreams.Publisher<MarshallableEntry<K,V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean includeValues)
Description copied from interface:NonBlockingStore
Publishes entries from this store that are in one of the provided segments and also pass the provided filter. The returned publisher must support being subscribed to any number of times. That is subsequent invocations ofPublisher.subscribe(Subscriber)
should provide independent views of the underlying entries to the Subscribers. Entries should not retrieved until a given Subscriber requests them via theSubscription.request(long)
method.Subscribing to the returned
Publisher
should not block the invoking thread. It is the responsibility of the store implementation to ensure this occurs. If however the store must block to perform an operation it is recommended to wrap your Publisher before returning with theBlockingManager.blockingPublisher(Publisher)
method and it will handle subscription and observation on the blocking and non-blocking executors respectively.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.EXPIRATION
When set the returned publisher must not return expired entries. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.- Specified by:
publishEntries
in interfaceNonBlockingStore<K,V>
- Parameters:
segments
- a set of segments to filter entries by. This will always be non null.filter
- a filter to filter they keys by. If this is null then no additional filtering should be done after segments.- Returns:
- a publisher that provides the keys from the store.
-
-