Package org.infinispan.persistence.sifs
Class SoftIndexFileStore
- java.lang.Object
-
- org.infinispan.persistence.sifs.SoftIndexFileStore
-
- All Implemented Interfaces:
Lifecycle
,AdvancedCacheLoader<Object,Object>
,AdvancedCacheWriter<Object,Object>
,AdvancedLoadWriteStore<Object,Object>
,CacheLoader<Object,Object>
,CacheWriter<Object,Object>
,ExternalStore<Object,Object>
public class SoftIndexFileStore extends Object implements AdvancedLoadWriteStore<Object,Object>
Local file-based cache store, optimized for write-through use with strong consistency guarantees (ability to flush disk operations before returning from the store call). * DESIGN: There are three threads operating in the cache-store: - LogAppender: Requests to store entries are passed to the LogAppender thread via queue, then the requestor threads wait until LogAppender notifies them about successful store. LogAppender serializes the writes into append-only file, writes the offset into TemporaryTable and enqueues request to update index into UpdateQueue. The append-only files have limited size, when the file is full, new file is started. - IndexUpdater: Reads the UpdateQueue, applies the operation into B-tree-like structure Index (exact description below) and then removes the entry from TemporaryTable. When the Index is overwriten, the current entry offset is retrieved and IndexUpdater increases the unused space statistics in FileStats. - Compactor: When a limit of unused space in some file is reached (according to FileStats), the Compactor starts reading this file sequentially, querying TemporaryTable or Index for the current entry position and copying the unchanged entries into another file. For the entries that are still valid in the original file, a compare-and-set (file-offset based) request is enqueued into UpdateQueue - therefore this operation cannot interfere with concurrent writes overwriting the entry. Multiple files can be merged into single file during compaction. Structures: - TemporaryTable: keeps the records about current entry location until this is applied to the Index. Each read request goes to the TemporaryTable, if the key is not found here, Index is queried. - UpdateQueue: bounded queue (to prevent grow the TemporaryTable too much) of either forced writes (used for regular stores) or compare-and-set writes (used by Compactor). - FileStats: simple (Concurrent)HashTable with actual file size and amount of unused space for each file. - Index: B+-tree of IndexNodes. The tree is dropped and built a new if the process crashes, it does not need to flush disk operations. On disk it is kept as single random-accessed file, with free blocks list stored in memory. As IndexUpdater may easily become a bottleneck under heavy load, the IndexUpdater thread, UpdateQueue and tree of IndexNodes may be multiplied several times - the Index is divided into Segments. Each segment owns keys according to the hashCode() of the key. Amount of entries in IndexNode is limited by the size it occupies on disk. This size is limited by configurable nodeSize (4096 bytes by default?), only in case that the node contains single pivot (too long) it can be longer. A key_prefix common for all keys in the IndexNode is stored in order to reduce space requirements. For implementation reasons the keys are limited to 32kB - this requirement may be circumvented later. The pivots are not whole keys - it is the shortest part of key that is greater than all left children (but lesser or equal to all right children) - let us call this key_part. The key_parts are sorted in the IndexNode, naturally. On disk it has this format: key_prefix_length(2 bytes), key_prefix, num_parts(2 bytes), ( key_part_length (2 bytes), key_part, left_child_index_node_offset (8 bytes))+, right_child_index_node_offset (8 bytes) In memory, for every child a SoftReferenceis held. When this reference is empty (but the offset in file is set), any reader may load the reference using double-locking pattern (synchronized over the reference itself). The entry is never loaded by multiple threads in parallel and even may block other threads trying to read this node. For each node in memory a RW-lock is held. When the IndexUpdater thread updates the Index (modifying some IndexNodes), it prepares a copy of these nodes (already stored into index file). Then, in locks only the uppermost node for writing, overwrites the references to new data and unlocks the this node. After that the changed nodes are traversed from top down, write locked and their record in index file is released. Reader threads crawl the tree from top down, locking the parent node (for reading), locking child node and unlocking parent node. - Author:
- Radim Vansa <rvansa@redhat.com>
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.infinispan.persistence.spi.AdvancedCacheWriter
AdvancedCacheWriter.PurgeListener<K>
-
-
Field Summary
Fields Modifier and Type Field Description static String
PREFIX_10_1
static String
PREFIX_11_0
-
Constructor Summary
Constructors Constructor Description SoftIndexFileStore()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
clear()
Removes all the data from the storage.boolean
contains(Object key)
Returns true if the storage contains an entry associated with the given key.boolean
delete(Object key)
void
destroy()
Method to be used to destroy and clean up any resources associated with this store.org.reactivestreams.Publisher<MarshallableEntry<Object,Object>>
entryPublisher(Predicate<? super Object> filter, boolean fetchValue, boolean fetchMetadata)
Publishes all entries from this store.protected Path
getIndexLocation()
void
init(InitializationContext ctx)
Used to initialize a cache loader.boolean
isAvailable()
protected boolean
isIndexLoaded()
protected boolean
isSeqIdOld(long seqId, Object key, byte[] serializedKey)
MarshallableEntry<Object,Object>
loadEntry(Object key)
Fetches an entry from the storage.org.reactivestreams.Publisher<Object>
publishKeys(Predicate<? super Object> filter)
Publishes all the keys from this store.void
purge(Executor threadPool, AdvancedCacheWriter.PurgeListener<? super Object> listener)
Using the thread in the pool, removed all the expired data from the persistence storage.int
size()
Returns the number of elements in the store.void
start()
Invoked on component startprotected void
startIndex()
void
stop()
Invoked on component stopvoid
write(MarshallableEntry<?,?> entry)
Persists the entry to the storage.-
Methods inherited from interface org.infinispan.persistence.spi.CacheWriter
bulkUpdate, deleteBatch
-
-
-
-
Field Detail
-
PREFIX_10_1
public static final String PREFIX_10_1
- See Also:
- Constant Field Values
-
PREFIX_11_0
public static final String PREFIX_11_0
- See Also:
- Constant Field Values
-
-
Method Detail
-
init
public void init(InitializationContext ctx)
Description copied from interface:CacheLoader
Used to initialize a cache loader. Typically invoked by thePersistenceManager
when setting up cache loaders.- Specified by:
init
in interfaceCacheLoader<Object,Object>
- Specified by:
init
in interfaceCacheWriter<Object,Object>
-
start
public void start()
Description copied from interface:Lifecycle
Invoked on component start
-
getIndexLocation
protected Path getIndexLocation()
-
isSeqIdOld
protected boolean isSeqIdOld(long seqId, Object key, byte[] serializedKey) throws IOException
- Throws:
IOException
-
startIndex
protected void startIndex()
-
isIndexLoaded
protected boolean isIndexLoaded()
-
stop
public void stop()
Description copied from interface:Lifecycle
Invoked on component stop
-
destroy
public void destroy()
Description copied from interface:ExternalStore
Method to be used to destroy and clean up any resources associated with this store. This is normally only useful for non shared stores.This method will ensure the store is stopped and properly cleans up all resources for it.
- Specified by:
destroy
in interfaceExternalStore<Object,Object>
-
isAvailable
public boolean isAvailable()
- Specified by:
isAvailable
in interfaceCacheLoader<Object,Object>
- Specified by:
isAvailable
in interfaceCacheWriter<Object,Object>
- Specified by:
isAvailable
in interfaceExternalStore<Object,Object>
- Returns:
- true if the writer can be connected to, otherwise false
-
clear
public void clear() throws PersistenceException
Description copied from interface:AdvancedCacheWriter
Removes all the data from the storage.- Specified by:
clear
in interfaceAdvancedCacheWriter<Object,Object>
- Throws:
PersistenceException
- in case of an error, e.g. communicating with the external storage
-
size
public int size()
Description copied from interface:AdvancedCacheLoader
Returns the number of elements in the store.- Specified by:
size
in interfaceAdvancedCacheLoader<Object,Object>
-
purge
public void purge(Executor threadPool, AdvancedCacheWriter.PurgeListener<? super Object> listener)
Description copied from interface:AdvancedCacheWriter
Using the thread in the pool, removed all the expired data from the persistence storage. For each removed entry, the supplied listener is invoked.When this method returns all entries will be purged and no tasks will be running due to this loader in the provided executor. If however an exception is thrown there could be tasks still pending or running in the executor.
- Specified by:
purge
in interfaceAdvancedCacheWriter<Object,Object>
-
write
public void write(MarshallableEntry<?,?> entry)
Description copied from interface:CacheWriter
Persists the entry to the storage.- Specified by:
write
in interfaceCacheWriter<Object,Object>
- See Also:
MarshallableEntry
-
delete
public boolean delete(Object key)
- Specified by:
delete
in interfaceCacheWriter<Object,Object>
- Returns:
- true if the entry existed in the persistent store and it was deleted.
-
contains
public boolean contains(Object key)
Description copied from interface:CacheLoader
Returns true if the storage contains an entry associated with the given key.- Specified by:
contains
in interfaceCacheLoader<Object,Object>
-
loadEntry
public MarshallableEntry<Object,Object> loadEntry(Object key)
Description copied from interface:CacheLoader
Fetches an entry from the storage. If aMarshallableEntry
needs to be created here,InitializationContext.getMarshallableEntryFactory()
()} andInitializationContext.getByteBufferFactory()
should be used.- Specified by:
loadEntry
in interfaceCacheLoader<Object,Object>
- Returns:
- the entry, or null if the entry does not exist
-
publishKeys
public org.reactivestreams.Publisher<Object> publishKeys(Predicate<? super Object> filter)
Description copied from interface:AdvancedCacheLoader
Publishes all the keys from this store. The given publisher can be used by as manySubscriber
s as desired. Keys are not retrieved until a given Subscriber requests them from theSubscription
.Stores will return only non expired keys
- Specified by:
publishKeys
in interfaceAdvancedCacheLoader<Object,Object>
- Parameters:
filter
- a filter - null is treated as allowing all entries- Returns:
- a publisher that will provide the keys from the store
-
entryPublisher
public org.reactivestreams.Publisher<MarshallableEntry<Object,Object>> entryPublisher(Predicate<? super Object> filter, boolean fetchValue, boolean fetchMetadata)
Description copied from interface:AdvancedCacheLoader
Publishes all entries from this store. The given publisher can be used by as manySubscriber
s as desired. Entries are not retrieved until a given Subscriber requests them from theSubscription
.If fetchMetadata is true this store must guarantee to not return any expired entries.
- Specified by:
entryPublisher
in interfaceAdvancedCacheLoader<Object,Object>
- Parameters:
filter
- a filter - null is treated as allowing all entriesfetchValue
- whether or not to fetch the value from the persistent store. E.g. if the iteration is intended only over the key set, no point fetching the values from the persistent store as wellfetchMetadata
- whether or not to fetch the metadata from the persistent store. E.g. if the iteration is intended only ove the key set, then no point fetching the metadata from the persistent store as well- Returns:
- a publisher that will provide the entries from the store
-
-