Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add onDropped callback to throttleLatest operator #7457

Merged
merged 2 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 55 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17327,7 +17327,61 @@ public final Flowable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @N
public final Flowable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast));
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, null));
}

/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them, invoking the consumer for any dropped item.
* <p>
* <img width="640" height="326" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.
* If the downstream is not ready to receive items, a
* {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}
* will be signaled.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>
* If the upstream signals an {@code onError} or {@code onDropped} callback crashes,
* the error is delivered immediately to the downstream. If both happen, a {@link CompositeException}
* is created, containing both the upstream and the callback error.
* If the {@code onDropped} callback crashes during cancellation, the exception is forwarded
* to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* </dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@code Scheduler} where the timed wait and latest item
* emission will be performed
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @param onDropped called when an item is replaced by a newer item that doesn't get delivered
* to the downstream, including the very last item if {@code emitLast} is {@code false}
* and the current undelivered item when the sequence gets canceled.
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit}, {@code scheduler} or {@code onDropped} is {@code null}
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, onDropped));
}

/**
Expand Down
50 changes: 49 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14362,7 +14362,55 @@ public final Observable<T> throttleLatest(long timeout, @NonNull TimeUnit unit,
public final Observable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast));
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, null));
}

/**
* Throttles items from the current {@code Observable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them, invoking the consumer for any dropped item.
* <p>
* <img width="640" height="326" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>
* If the upstream signals an {@code onError} or {@code onDropped} callback crashes,
* the error is delivered immediately to the downstream. If both happen, a {@link CompositeException}
* is created, containing both the upstream and the callback error.
* If the {@code onDropped} callback crashes when the sequence gets disposed, the exception is forwarded
* to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* </dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@code Scheduler} where the timed wait and latest item
* emission will be performed
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @param onDropped called when an item is replaced by a newer item that doesn't get delivered
* to the downstream, including the very last item if {@code emitLast} is {@code false}
* and the current undelivered item when the sequence gets disposed.
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit}, {@code scheduler} or {@code onDropped} is {@code null}
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
@Experimental
public final Observable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, onDropped));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Emits the next or latest item when the given time elapses.
Expand All @@ -44,19 +46,24 @@ public final class FlowableThrottleLatest<T> extends AbstractFlowableWithUpstrea

final boolean emitLast;

final Consumer<? super T> onDropped;

public FlowableThrottleLatest(Flowable<T> source,
long timeout, TimeUnit unit, Scheduler scheduler,
boolean emitLast) {
long timeout, TimeUnit unit,
Scheduler scheduler,
boolean emitLast,
Consumer<? super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.emitLast = emitLast;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new ThrottleLatestSubscriber<>(s, timeout, unit, scheduler.createWorker(), emitLast));
source.subscribe(new ThrottleLatestSubscriber<>(s, timeout, unit, scheduler.createWorker(), emitLast, onDropped));
}

static final class ThrottleLatestSubscriber<T>
Expand All @@ -79,6 +86,8 @@ static final class ThrottleLatestSubscriber<T>

final AtomicLong requested;

final Consumer<? super T> onDropped;

Subscription upstream;

volatile boolean done;
Expand All @@ -93,15 +102,18 @@ static final class ThrottleLatestSubscriber<T>
boolean timerRunning;

ThrottleLatestSubscriber(Subscriber<? super T> downstream,
long timeout, TimeUnit unit, Scheduler.Worker worker,
boolean emitLast) {
long timeout, TimeUnit unit,
Scheduler.Worker worker,
boolean emitLast,
Consumer<? super T> onDropped) {
this.downstream = downstream;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.emitLast = emitLast;
this.latest = new AtomicReference<>();
this.requested = new AtomicLong();
this.onDropped = onDropped;
}

@Override
Expand All @@ -115,7 +127,17 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
latest.set(t);
T old = latest.getAndSet(t);
if (onDropped != null && old != null) {
try {
onDropped.accept(old);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
error = ex;
done = true;
}
}
drain();
}

Expand Down Expand Up @@ -145,6 +167,22 @@ public void cancel() {
upstream.cancel();
worker.dispose();
if (getAndIncrement() == 0) {
clear();
}
}

void clear() {
if (onDropped != null) {
T v = latest.getAndSet(null);
if (v != null) {
try {
onDropped.accept(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
} else {
latest.lazySet(null);
}
}
Expand All @@ -170,14 +208,27 @@ void drain() {

for (;;) {
if (cancelled) {
latest.lazySet(null);
clear();
return;
}

boolean d = done;
Throwable error = this.error;

if (d && error != null) {
latest.lazySet(null);
if (onDropped != null) {
T v = latest.getAndSet(null);
if (v != null) {
try {
onDropped.accept(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
error = new CompositeException(error, ex);
}
}
} else {
latest.lazySet(null);
}
downstream.onError(error);
worker.dispose();
return;
Expand All @@ -187,19 +238,31 @@ void drain() {
boolean empty = v == null;

if (d) {
if (!empty && emitLast) {
if (!empty) {
v = latest.getAndSet(null);
long e = emitted;
if (e != requested.get()) {
emitted = e + 1;
downstream.onNext(v);
downstream.onComplete();
if (emitLast) {
long e = emitted;
if (e != requested.get()) {
emitted = e + 1;
downstream.onNext(v);
downstream.onComplete();
} else {
tryDropAndSignalMBE(v);
}
} else {
downstream.onError(new MissingBackpressureException(
"Could not emit final value due to lack of requests"));
if (onDropped != null) {
try {
onDropped.accept(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
worker.dispose();
return;
}
}
downstream.onComplete();
}
} else {
latest.lazySet(null);
downstream.onComplete();
}
worker.dispose();
Expand All @@ -222,8 +285,7 @@ void drain() {
emitted = e + 1;
} else {
upstream.cancel();
downstream.onError(new MissingBackpressureException(
"Could not emit value due to lack of requests"));
tryDropAndSignalMBE(v);
worker.dispose();
return;
}
Expand All @@ -242,5 +304,19 @@ void drain() {
}
}
}

void tryDropAndSignalMBE(T valueToDrop) {
Throwable errorToSignal = new MissingBackpressureException(
"Could not emit value due to lack of requests");
if (onDropped != null) {
try {
onDropped.accept(valueToDrop);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
errorToSignal = new CompositeException(errorToSignal, ex);
}
}
downstream.onError(errorToSignal);
}
}
}