public abstract class AbstractAsyncPublisherHandler<Target,Output,InitialResponse,NextResponse> extends Object implements io.reactivex.rxjava3.functions.LongConsumer, io.reactivex.rxjava3.functions.Action
Subscription.request(long)
the handler will keep
track of the outstanding count and invoke either sendInitialCommand(Object, int)
or
sendNextCommand(Object, int)
depending upon if it is the first request or subsequent.
This handler guarantees that there is only one outstanding request and that if the request amount is larger than the
batch, it will continue to send new commands one at a time until the request amount has been satisfied.
The handler processes each target via the provided Supplier
one by one, exhausting all values from the
target, until there are no more targets left or the publishing has been cancelled by the subscriber.
When a command returns successfully either the handleInitialResponse(Object, Object)
or
handleNextResponse(Object, Object)
will be invoked with the response object. The returned entries can then
be emitted by invoking the onNext(Object)
method for each value. Note that the return of onNext
should be checked in case if the Subscriber
has cancelled the publishing of values in the middle.
A command may also encounter a Throwable and it is possible to customize what happens by implementing the
handleThrowableInResponse(Throwable, Object)
method. For example you may want to skip the given
Each request is provided a batchSize
argument and the underlying resource should adhere to this,
failure to do so may cause an OutOfMemoryError
, since entries are only emitted to the Subscriber based on the
requested amount, and any additional are enqueued.
This handler also supports Subscription.cancel()
by extending the sendCancel(Object)
command and
the underlying service must be cancelled in an asynchronous and non blocking fashion. Once cancelled this Publisher
will not publish additional values or requests.
Note this handler only supports a single Subscriber for the returned Publisher from startPublisher()
. Failure
to do so can cause multiple requests and unexpected problems.
Modifier and Type | Field and Description |
---|---|
protected int |
batchSize |
protected static Log |
log |
protected Supplier<Target> |
supplier |
protected static boolean |
trace |
Modifier | Constructor and Description |
---|---|
protected |
AbstractAsyncPublisherHandler(int batchSize,
Supplier<Target> supplier,
Target firstTarget) |
Modifier and Type | Method and Description |
---|---|
void |
accept(long count)
This method is invoked every time a new request is sent to the underlying publisher.
|
protected boolean |
checkCancelled()
This method returns whether this subscription has been cancelled
|
protected abstract long |
handleInitialResponse(InitialResponse response,
Target target) |
protected abstract long |
handleNextResponse(NextResponse response,
Target target) |
protected void |
handleThrowableInResponse(Throwable t,
Target target)
Allows any implementor to handle what happens when a Throwable is encountered.
|
protected boolean |
onNext(Output value)
Method that should be called for each emitted output value.
|
void |
run()
This is invoked when the Subscription is cancelled
|
protected abstract void |
sendCancel(Target target) |
protected abstract CompletionStage<InitialResponse> |
sendInitialCommand(Target target,
int batchSize) |
protected abstract CompletionStage<NextResponse> |
sendNextCommand(Target target,
int batchSize) |
org.reactivestreams.Publisher<Output> |
startPublisher() |
protected void |
targetComplete()
Method to invoke when a given target is found to have been completed and the next target should be used
|
protected static final Log log
protected static final boolean trace
protected final int batchSize
public org.reactivestreams.Publisher<Output> startPublisher()
public void run()
run
in interface io.reactivex.rxjava3.functions.Action
protected abstract void sendCancel(Target target)
protected abstract CompletionStage<InitialResponse> sendInitialCommand(Target target, int batchSize)
protected abstract CompletionStage<NextResponse> sendNextCommand(Target target, int batchSize)
protected abstract long handleInitialResponse(InitialResponse response, Target target)
protected abstract long handleNextResponse(NextResponse response, Target target)
protected void handleThrowableInResponse(Throwable t, Target target)
Subscriber.onError(Throwable)
and stops processing. It is possible to
ignore the throwable and continue processing by invoking accept(long)
with a value of 0. It may also
be required to reset the currentTarget
so it is initialized to the next supplied value.t
- throwable that was encounteredtarget
- the target which was invoked that caused the throwablepublic void accept(long count)
accept
in interface io.reactivex.rxjava3.functions.LongConsumer
count
- request countprotected boolean onNext(Output value)
value
- value emit to the publisherprotected void targetComplete()
protected boolean checkCancelled()
Copyright © 2021 JBoss by Red Hat. All rights reserved.