K
- key value typeV
- value value type@Experimental public interface NonBlockingStore<K,V>
The first method invoked on this store is start(InitializationContext)
, which starts the store.
Once the returned stage has completed, the store is assumed to be in working state and ready to handle operations.
Infinispan guarantees the visibility of variables written during the start method, so you do not need to
synchronize them manually unless they are mutated in the normal operations of the store itself.
After the store starts, Infinispan uses the characteristics()
method to query the characteristics of
the store. It is highly recommended that this method never change the values it returns after the
store starts because characteristics might not be cached. For more information, see NonBlockingStore.Characteristic
and its various values.
By default, this interface has only a few required methods. If you implement any of the optional methods,
ensure that you advertise the appropriate characteristic for that method so that Infinispan invokes it.
If Infinispan is instructed that a
characteristic is available and the method is not overridden, an UnsupportedOperationException
will be
thrown when trying to invoke the appropriate method. Each NonBlockingStore.Characteristic
defines what methods map to which
characteristic.
Although recommended, segmentation support in store implementations is optional. Segment parameters are provided
for all methods where segment information is required, for example load(int, Object)
and
#publishEntries(IntSet, Predicate, boolean). If your store implementation does not support segmentation,
you can ignore these parameters. However, you should note that segmented stores allow Infinispan caches to more
efficiently perform bulk operations such as {@code Cache.size()} or {@code Cache.entrySet().stream()}. Segmentation
also decreases the duration of state transfers when {@link PersistenceConfiguration#fetchPersistentState()} is enabled,
as well as the time required to remove data by segments. To indicate that a store implementation supports segmentation,
the {@link Characteristic#SEGMENTABLE} characteristic must be returned by the {@link #characteristics()} method. Store
implementations can determine if stores are configured to be segmented if {@link StoreConfiguration#segmented()} is
enabled, which is available from the {@code InitializationContext}.
Store implementations might need to interact with blocking APIs to perform their required operations. However the invoking thread must never be blocked, so Infinispan provides a {@link org.infinispan.util.concurrent.BlockingManager} utility class that handles blocking operations to ensure that they do not leak into the internal system. {@code BlockingManager} does this by running any blocking operations on blocking threads, while any stages continue on non-blocking threads.
This utility class provides different methods that range from equivalents for commonly used methods, such as {@link java.util.concurrent.CompletableFuture#supplyAsync(Supplier, Executor)}, to a wrapper around a {@link Publisher} that ensures it is subscribed and obversed on the correct threads. To obtain a {@code BlockingManager}, invoke the {@link InitializationContext#getBlockingManager()} method on the provided context in the start method.
Implementations of this store must be thread safe if concurrent operations are performed on it. The one exception is that {@link #start(InitializationContext)} and {@link #stop()} are not invoked concurrently with other operations.
Note that this interface is Experimental and its methods may change slightly over time until it has matured.
Modifier and Type | Interface and Description |
---|---|
static class |
NonBlockingStore.Characteristic
Enumeration defining the various characteristics of the underlying store to communicate what features it may
or may not support.
|
static interface |
NonBlockingStore.SegmentedPublisher<Type>
A Publisher that provides a stream of values and the segments to which those values map.
|
Modifier and Type | Method and Description |
---|---|
default CompletionStage<Void> |
addSegments(IntSet segments)
Invoked when a node becomes an owner of the given segments.
|
default CompletionStage<Long> |
approximateSize(IntSet segments)
Returns an estimation of the amount of entries that map to the given segments in the store.
|
default CompletionStage<Void> |
batch(int publisherCount,
org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher,
org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,V>>> writePublisher)
Writes and removes the entries provided by the Publishers into the store.
|
default Set<NonBlockingStore.Characteristic> |
characteristics()
Returns a set of characteristics for this store and its elements.
|
CompletionStage<Void> |
clear()
Clears all entries from the store.
|
default CompletionStage<Void> |
commit(Transaction transaction)
Commit changes in the provided transaction to the underlying store.
|
default CompletionStage<Boolean> |
containsKey(int segment,
Object key)
Returns a stage that will contain whether the value can be found in the store.
|
CompletionStage<Boolean> |
delete(int segment,
Object key)
Removes the entry for given key and segment from the store returning a stage that when completes normally
contains whether the entry was actually removed or not.
|
default boolean |
ignoreCommandWithFlags(long commandFlags)
Some stores may not want to perform operations based on if a command has certain flags.
|
default CompletionStage<Boolean> |
isAvailable()
Returns a stage that, when complete, returns a boolean indicating whether the current store can be accessed for
requests.
|
CompletionStage<MarshallableEntry<K,V>> |
load(int segment,
Object key)
Returns a stage that will contain the value loaded from the store.
|
default CompletionStage<Void> |
prepareWithModifications(Transaction transaction,
int publisherCount,
org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher,
org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,V>>> writePublisher)
Write remove and put modifications to the store in the prepare phase, which should not yet persisted until the
same transaction is committed via
commit(Transaction) or they are discarded if the transaction is rolled back via
rollback(Transaction) . |
default org.reactivestreams.Publisher<MarshallableEntry<K,V>> |
publishEntries(IntSet segments,
Predicate<? super K> filter,
boolean includeValues)
Publishes entries from this store that are in one of the provided segments and also pass the provided filter.
|
default org.reactivestreams.Publisher<K> |
publishKeys(IntSet segments,
Predicate<? super K> filter)
Publishes keys from this store that are in one of the provided segments and also pass the provided filter.
|
default org.reactivestreams.Publisher<MarshallableEntry<K,V>> |
purgeExpired()
Returns a Publisher that, after it is subscribed to, removes any expired entries from the store and publishes
them to the returned Publisher.
|
default CompletionStage<Void> |
removeSegments(IntSet segments)
Invoked when a node loses ownership of the given segments.
|
default CompletionStage<Void> |
rollback(Transaction transaction)
Roll back the changes from the provided transaction to the underlying store.
|
default CompletionStage<Long> |
size(IntSet segments)
Returns the amount of entries that map to the given segments in the store.
|
CompletionStage<Void> |
start(InitializationContext ctx)
The first method to invoke so that the store can be configured and additional steps, such as connecting through
a socket or opening file descriptors, are performed.
|
CompletionStage<Void> |
stop()
This method is invoked when the cache is being shutdown.
|
CompletionStage<Void> |
write(int segment,
MarshallableEntry<? extends K,? extends V> entry)
Writes the entry to the store for the given segment returning a stage that completes normally when it is finished.
|
CompletionStage<Void> start(InitializationContext ctx)
The provided InitializationContext
contains many helpful objects, including the configuration of the
cache and store, concurrency utilities such as BlockingManager
or
an executor reserved for non-blocking operations only InitializationContext.getNonBlockingExecutor()
.
This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking start
a second time if
stop()
has been invoked and allowed for its stage to complete.
ctx
- initialization context used to initialize this store.CompletionStage<Void> stop()
This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking start(InitializationContext)
a second time if stop
has been invoked and allowed for its stage to complete.
default Set<NonBlockingStore.Characteristic> characteristics()
Refer to NonBlockingStore.Characteristic
and its values for descriptions of each characteristic for stores.
default CompletionStage<Boolean> isAvailable()
StoreUnavailableException
being thrown until the store becomes available again.
Store availability is is polled periodically to update the status of stores if their availability changes. This method
is not invoked concurrently with itself. In other words, this method is not invoked until after the previous stage
has completed. However this method is invoked concurrently with other operations, except for
start(InitializationContext)
and stop()
.
If a store is configured to be StoreConfiguration.async()
and the store becomes unavailable, then it is
possible for the cache operations to be accepted in the interim period between the loss of availability and the
modification-queue becoming full. This allows for this store to be unavailable for short periods of time without a
StoreUnavailableException
being thrown; however if the store does not become available before the queue
fills, then a StoreUnavailableException
is thrown.
CompletionStage<MarshallableEntry<K,V>> load(int segment, Object key)
MarshallableEntry
needs to be
created here, InitializationContext.getMarshallableEntryFactory()
()} and InitializationContext.getByteBufferFactory()
should be used.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.WRITE_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.EXPIRATION |
When set this method must not return expired entries. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segment parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to load.default CompletionStage<Boolean> containsKey(int segment, Object key)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.WRITE_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.EXPIRATION |
When set this method must not return true if the entry was expired. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segment parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to check.CompletionStage<Void> write(int segment, MarshallableEntry<? extends K,? extends V> entry)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.EXPIRATION |
When set, this method must store the expiration metadata. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When set, this method must ensure the segment is stored with the entry. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segment
- the segment for the given key if segmentation is enabled, otherwise 0.entry
- the entry to persist to the store.CompletionStage<Boolean> delete(int segment, Object key)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segment parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to delete from the store.default CompletionStage<Void> addSegments(IntSet segments)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.SHAREABLE |
If the store has this characteristic and is configured to be StoreConfiguration.shared() ,
this method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
This method is invoked only if the store has this characteristic and configured to be
StoreConfiguration.segmented() . |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segments
- the segments to add.default CompletionStage<Void> removeSegments(IntSet segments)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.SHAREABLE |
If the store has this characteristic and is configured to be StoreConfiguration.shared() ,
this method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
This method is invoked only if the store has this characteristic and configured to be
StoreConfiguration.segmented() . |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segments
- the segments to remove.CompletionStage<Void> clear()
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
default CompletionStage<Void> batch(int publisherCount, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,V>>> writePublisher)
StoreConfiguration.maxBatchSize()
setting.
Each of the Publisher
s may publish up to publisherCount
publishers where each
publisher is separated by the segment each entry maps to. Failure to request at least publisherCount
publishers from the Publisher may cause a
deadlock. Many reactive tools have methods such as flatMap
that take an argument of how many concurrent
subscriptions it manages, which is perfectly matched with this argument.
WARNING: For performance reasons neither Publisher will emit any NonBlockingStore.SegmentedPublisher
s until both the write
and remove Publisher are subscribed to. These Publishers should also be only subscribed to once.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
If not set, the provided publisherCount parameter has a value of 1,
which means there is only be one SegmentedPublisher to subscribe to. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
publisherCount
- the maximum number of SegmentPublisher
s either publisher will publishremovePublisher
- publishes what keys should be removed from the storewritePublisher
- publishes the entries to write to the storedefault CompletionStage<Long> size(IntSet segments)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segments
- the segments for which the entries are counted.default CompletionStage<Long> approximateSize(IntSet segments)
size(IntSet)
except that it is not strict about the returned size. For instance, this method might ignore
if an entry is expired or if the store has some underlying optimizations to eventually have a consistent size.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
segments
- the segments for which the entries are counted.default org.reactivestreams.Publisher<MarshallableEntry<K,V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean includeValues)
Publisher.subscribe(Subscriber)
should provide independent views of the underlying entries to the Subscribers.
Entries should not retrieved until a given Subscriber requests them via the
Subscription.request(long)
method.
Subscribing to the returned Publisher
should not block the invoking thread. It is the responsibility of
the store implementation to ensure this occurs. If however the store must block to perform an operation it
is recommended to wrap your Publisher before returning with the
BlockingManager.blockingPublisher(Publisher)
method and it will handle
subscription and observation on the blocking and non-blocking executors respectively.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.EXPIRATION |
When set the returned publisher must not return expired entries. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
segments
- a set of segments to filter entries by. This will always be non null.filter
- a filter to filter they keys by. If this is null then no additional filtering should be done after segments.default org.reactivestreams.Publisher<K> publishKeys(IntSet segments, Predicate<? super K> filter)
Publisher.subscribe(Subscriber)
should provide independent views of the underlying keys to the Subscribers.
Keys should not retrieved until a given Subscriber requests them via the
Subscription.request(long)
method.
Subscribing to the returned Publisher
should not block the invoking thread. It is the responsibility of
the store implementation to ensure this occurs. If however the store must block to perform an operation it
is recommended to wrap your Publisher before returning with the
BlockingManager.blockingPublisher(Publisher)
method and it will handle
subscription and observation on the blocking and non blocking executors respectively.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.EXPIRATION |
When set the returned publisher must not return expired keys. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
segments
- a set of segments to filter keys by. This will always be non null.filter
- a filter to filter they keys by. If this is null then no additional filtering should be done after segments.default org.reactivestreams.Publisher<MarshallableEntry<K,V>> purgeExpired()
When the Publisher is subscribed to, it is expected to do point-in-time expiration and should not return a Publisher that has infinite entries or never completes.
Subscribing to the returned Publisher
should not block the invoking thread. It is the responsibility of
the store implementation to ensure this occurs. If however the store must block to perform an operation it
is recommended to wrap your Publisher before returning with the
BlockingManager.blockingPublisher(Publisher)
method and it will handle
subscription and observation on the blocking and non blocking executors respectively.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.EXPIRATION |
This method is only invoked if the store has this characteristic. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
default CompletionStage<Void> prepareWithModifications(Transaction transaction, int publisherCount, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,V>>> writePublisher)
commit(Transaction)
or they are discarded if the transaction is rolled back via
rollback(Transaction)
.
Each of the Publisher
s may publish up to publisherCount
publishers where each
publisher is separated by the segment each entry maps to. Failure to request at least publisherCount
publishers from the Publisher may cause a
deadlock. Many reactive tools have methods such as flatMap
that take an argument of how many concurrent
subscriptions it manages, which is perfectly matched with this argument.
WARNING: For performance reasons neither Publisher will emit any NonBlockingStore.SegmentedPublisher
s until both the write
and remove Publisher are subscribed to. These Publishers should also be only subscribed to once.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.TRANSACTIONAL |
This method is invoked only if the store has this characteristic. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
transaction
- the current transactional context.publisherCount
- the maximum number of SegmentPublisher
s either publisher will publishremovePublisher
- publishes what keys should be removed from the storewritePublisher
- publishes the entries to write to the storedefault CompletionStage<Void> commit(Transaction transaction)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.TRANSACTIONAL |
This method is invoked only if the store has this characteristic. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
transaction
- the current transactional context.default CompletionStage<Void> rollback(Transaction transaction)
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.TRANSACTIONAL |
This method is invoked only if the store has this characteristic. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
transaction
- the current transactional context.@Experimental default boolean ignoreCommandWithFlags(long commandFlags)
commandFlags
- the flags attributed to the command when performing the operation.Copyright © 2021 JBoss by Red Hat. All rights reserved.