K
- key type for the storeV
- value type for the storepublic class AsyncNonBlockingStore<K,V> extends DelegatingNonBlockingStore<K,V> implements io.reactivex.rxjava3.functions.Consumer<io.reactivex.rxjava3.core.Flowable<org.infinispan.persistence.async.AsyncNonBlockingStore.Modification>>
Whenever a write operation is performed it will also attempt to start a batch write immediately to the delegate store. Any concurrent writes during this time may be included in the batch. Any additional writes will be enqueued until the batch completes in which case it will automatically submit the pending batch, if there is one. Write operations to the same key in the same batch will be coalesced with only the last write being written to the underlying store. If the number of enqueued pending write operations becomes equal or larger than the modification queue, then any subsequent write will be added to the queue, but the returned Stage will not complete until the current batch completes in an attempt to provide some backpressure to slow writes.
Read operations may be resolved by this store immediately if the given key is still being updated in the delegate store or if it is enqueued for the next batch. If the key is in neither it will query the underlying store to acquire it.
NonBlockingStore.Characteristic, NonBlockingStore.SegmentedPublisher<Type>
Constructor and Description |
---|
AsyncNonBlockingStore(NonBlockingStore<K,V> actual) |
Modifier and Type | Method and Description |
---|---|
void |
accept(io.reactivex.rxjava3.core.Flowable<org.infinispan.persistence.async.AsyncNonBlockingStore.Modification> modificationFlowable)
This method is invoked every time a new batch of entries is generated.
|
CompletionStage<Void> |
addSegments(IntSet segments)
Invoked when a node becomes an owner of the given segments.
|
CompletionStage<Long> |
approximateSize(IntSet segments)
Returns an estimation of the amount of entries that map to the given segments in the store.
|
CompletionStage<Void> |
batch(int publisherCount,
org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher,
org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,V>>> writePublisher)
Writes and removes the entries provided by the Publishers into the store.
|
CompletionStage<Void> |
clear()
Clears all entries from the store.
|
NonBlockingStore<K,V> |
delegate() |
CompletionStage<Boolean> |
delete(int segment,
Object key)
Removes the entry for given key and segment from the store returning a stage that when completes normally
contains whether the entry was actually removed or not.
|
CompletionStage<Boolean> |
isAvailable()
Returns a stage that, when complete, returns a boolean indicating whether the current store can be accessed for
requests.
|
CompletionStage<MarshallableEntry<K,V>> |
load(int segment,
Object key)
Returns a stage that will contain the value loaded from the store.
|
org.reactivestreams.Publisher<MarshallableEntry<K,V>> |
publishEntries(IntSet segments,
Predicate<? super K> filter,
boolean includeValues)
Publishes entries from this store that are in one of the provided segments and also pass the provided filter.
|
org.reactivestreams.Publisher<K> |
publishKeys(IntSet segments,
Predicate<? super K> filter)
Publishes keys from this store that are in one of the provided segments and also pass the provided filter.
|
org.reactivestreams.Publisher<MarshallableEntry<K,V>> |
purgeExpired()
Returns a Publisher that, after it is subscribed to, removes any expired entries from the store and publishes
them to the returned Publisher.
|
CompletionStage<Void> |
removeSegments(IntSet segments)
Invoked when a node loses ownership of the given segments.
|
CompletionStage<Long> |
size(IntSet segments)
Returns the amount of entries that map to the given segments in the store.
|
CompletionStage<Void> |
start(InitializationContext ctx)
The first method to invoke so that the store can be configured and additional steps, such as connecting through
a socket or opening file descriptors, are performed.
|
CompletionStage<Void> |
stop()
This method is invoked when the cache is being shutdown.
|
CompletionStage<Void> |
write(int segment,
MarshallableEntry<? extends K,? extends V> entry)
Writes the entry to the store for the given segment returning a stage that completes normally when it is finished.
|
characteristics, commit, containsKey, ignoreCommandWithFlags, prepareWithModifications, rollback
public AsyncNonBlockingStore(NonBlockingStore<K,V> actual)
public CompletionStage<Void> start(InitializationContext ctx)
NonBlockingStore
The provided InitializationContext
contains many helpful objects, including the configuration of the
cache and store, concurrency utilities such as BlockingManager
or
an executor reserved for non-blocking operations only InitializationContext.getNonBlockingExecutor()
.
This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking start
a second time if
NonBlockingStore.stop()
has been invoked and allowed for its stage to complete.
start
in interface NonBlockingStore<K,V>
start
in class DelegatingNonBlockingStore<K,V>
ctx
- initialization context used to initialize this store.public CompletionStage<Void> stop()
NonBlockingStore
This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking NonBlockingStore.start(InitializationContext)
a second time if stop
has been invoked and allowed for its stage to complete.
stop
in interface NonBlockingStore<K,V>
stop
in class DelegatingNonBlockingStore<K,V>
public void accept(io.reactivex.rxjava3.core.Flowable<org.infinispan.persistence.async.AsyncNonBlockingStore.Modification> modificationFlowable)
accept
in interface io.reactivex.rxjava3.functions.Consumer<io.reactivex.rxjava3.core.Flowable<org.infinispan.persistence.async.AsyncNonBlockingStore.Modification>>
modificationFlowable
- the next stream of values to enqueue and eventually sendpublic org.reactivestreams.Publisher<MarshallableEntry<K,V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean includeValues)
NonBlockingStore
Publisher.subscribe(Subscriber)
should provide independent views of the underlying entries to the Subscribers.
Entries should not retrieved until a given Subscriber requests them via the
Subscription.request(long)
method.
Subscribing to the returned Publisher
should not block the invoking thread. It is the responsibility of
the store implementation to ensure this occurs. If however the store must block to perform an operation it
is recommended to wrap your Publisher before returning with the
BlockingManager.blockingPublisher(Publisher)
method and it will handle
subscription and observation on the blocking and non-blocking executors respectively.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.EXPIRATION |
When set the returned publisher must not return expired entries. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
publishEntries
in interface NonBlockingStore<K,V>
publishEntries
in class DelegatingNonBlockingStore<K,V>
segments
- a set of segments to filter entries by. This will always be non null.filter
- a filter to filter they keys by. If this is null then no additional filtering should be done after segments.public org.reactivestreams.Publisher<K> publishKeys(IntSet segments, Predicate<? super K> filter)
NonBlockingStore
Publisher.subscribe(Subscriber)
should provide independent views of the underlying keys to the Subscribers.
Keys should not retrieved until a given Subscriber requests them via the
Subscription.request(long)
method.
Subscribing to the returned Publisher
should not block the invoking thread. It is the responsibility of
the store implementation to ensure this occurs. If however the store must block to perform an operation it
is recommended to wrap your Publisher before returning with the
BlockingManager.blockingPublisher(Publisher)
method and it will handle
subscription and observation on the blocking and non blocking executors respectively.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.EXPIRATION |
When set the returned publisher must not return expired keys. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
publishKeys
in interface NonBlockingStore<K,V>
publishKeys
in class DelegatingNonBlockingStore<K,V>
segments
- a set of segments to filter keys by. This will always be non null.filter
- a filter to filter they keys by. If this is null then no additional filtering should be done after segments.public CompletionStage<MarshallableEntry<K,V>> load(int segment, Object key)
NonBlockingStore
MarshallableEntry
needs to be
created here, InitializationContext.getMarshallableEntryFactory()
()} and InitializationContext.getByteBufferFactory()
should be used.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.WRITE_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.EXPIRATION |
When set this method must not return expired entries. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segment parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
load
in interface NonBlockingStore<K,V>
load
in class DelegatingNonBlockingStore<K,V>
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to load.public CompletionStage<Void> batch(int publisherCount, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,V>>> writePublisher)
NonBlockingStore
StoreConfiguration.maxBatchSize()
setting.
Each of the Publisher
s may publish up to publisherCount
publishers where each
publisher is separated by the segment each entry maps to. Failure to request at least publisherCount
publishers from the Publisher may cause a
deadlock. Many reactive tools have methods such as flatMap
that take an argument of how many concurrent
subscriptions it manages, which is perfectly matched with this argument.
WARNING: For performance reasons neither Publisher will emit any NonBlockingStore.SegmentedPublisher
s until both the write
and remove Publisher are subscribed to. These Publishers should also be only subscribed to once.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
If not set, the provided publisherCount parameter has a value of 1,
which means there is only be one SegmentedPublisher to subscribe to. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
batch
in interface NonBlockingStore<K,V>
batch
in class DelegatingNonBlockingStore<K,V>
publisherCount
- the maximum number of SegmentPublisher
s either publisher will publishremovePublisher
- publishes what keys should be removed from the storewritePublisher
- publishes the entries to write to the storepublic CompletionStage<Void> write(int segment, MarshallableEntry<? extends K,? extends V> entry)
NonBlockingStore
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.EXPIRATION |
When set, this method must store the expiration metadata. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When set, this method must ensure the segment is stored with the entry. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
write
in interface NonBlockingStore<K,V>
write
in class DelegatingNonBlockingStore<K,V>
segment
- the segment for the given key if segmentation is enabled, otherwise 0.entry
- the entry to persist to the store.public CompletionStage<Boolean> delete(int segment, Object key)
NonBlockingStore
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segment parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
delete
in interface NonBlockingStore<K,V>
delete
in class DelegatingNonBlockingStore<K,V>
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to delete from the store.public CompletionStage<Void> clear()
NonBlockingStore
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.READ_ONLY |
This method will never be invoked. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
clear
in interface NonBlockingStore<K,V>
clear
in class DelegatingNonBlockingStore<K,V>
public org.reactivestreams.Publisher<MarshallableEntry<K,V>> purgeExpired()
NonBlockingStore
When the Publisher is subscribed to, it is expected to do point-in-time expiration and should not return a Publisher that has infinite entries or never completes.
Subscribing to the returned Publisher
should not block the invoking thread. It is the responsibility of
the store implementation to ensure this occurs. If however the store must block to perform an operation it
is recommended to wrap your Publisher before returning with the
BlockingManager.blockingPublisher(Publisher)
method and it will handle
subscription and observation on the blocking and non blocking executors respectively.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.EXPIRATION |
This method is only invoked if the store has this characteristic. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
purgeExpired
in interface NonBlockingStore<K,V>
purgeExpired
in class DelegatingNonBlockingStore<K,V>
public CompletionStage<Void> addSegments(IntSet segments)
NonBlockingStore
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.SHAREABLE |
If the store has this characteristic and is configured to be StoreConfiguration.shared() ,
this method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
This method is invoked only if the store has this characteristic and configured to be
StoreConfiguration.segmented() . |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
addSegments
in interface NonBlockingStore<K,V>
addSegments
in class DelegatingNonBlockingStore<K,V>
segments
- the segments to add.public CompletionStage<Void> removeSegments(IntSet segments)
NonBlockingStore
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.SHAREABLE |
If the store has this characteristic and is configured to be StoreConfiguration.shared() ,
this method will never be invoked. |
NonBlockingStore.Characteristic.SEGMENTABLE |
This method is invoked only if the store has this characteristic and configured to be
StoreConfiguration.segmented() . |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
removeSegments
in interface NonBlockingStore<K,V>
removeSegments
in class DelegatingNonBlockingStore<K,V>
segments
- the segments to remove.public CompletionStage<Long> size(IntSet segments)
NonBlockingStore
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
size
in interface NonBlockingStore<K,V>
size
in class DelegatingNonBlockingStore<K,V>
segments
- the segments for which the entries are counted.public CompletionStage<Long> approximateSize(IntSet segments)
NonBlockingStore
NonBlockingStore.size(IntSet)
except that it is not strict about the returned size. For instance, this method might ignore
if an entry is expired or if the store has some underlying optimizations to eventually have a consistent size.
Characteristic | Effect |
---|---|
NonBlockingStore.Characteristic.BULK_READ |
This method is only invoked if the store has this characteristic. |
NonBlockingStore.Characteristic.SEGMENTABLE |
When this is not set the provided segments parameter may be ignored. |
If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.
approximateSize
in interface NonBlockingStore<K,V>
approximateSize
in class DelegatingNonBlockingStore<K,V>
segments
- the segments for which the entries are counted.public CompletionStage<Boolean> isAvailable()
NonBlockingStore
StoreUnavailableException
being thrown until the store becomes available again.
Store availability is is polled periodically to update the status of stores if their availability changes. This method
is not invoked concurrently with itself. In other words, this method is not invoked until after the previous stage
has completed. However this method is invoked concurrently with other operations, except for
NonBlockingStore.start(InitializationContext)
and NonBlockingStore.stop()
.
If a store is configured to be StoreConfiguration.async()
and the store becomes unavailable, then it is
possible for the cache operations to be accepted in the interim period between the loss of availability and the
modification-queue becoming full. This allows for this store to be unavailable for short periods of time without a
StoreUnavailableException
being thrown; however if the store does not become available before the queue
fills, then a StoreUnavailableException
is thrown.
isAvailable
in interface NonBlockingStore<K,V>
isAvailable
in class DelegatingNonBlockingStore<K,V>
public NonBlockingStore<K,V> delegate()
delegate
in class DelegatingNonBlockingStore<K,V>
Copyright © 2021 JBoss by Red Hat. All rights reserved.