Skip to content

Commit

Permalink
3.x: ThrottleWithTimeout+Consumer cleanup (#7511)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 9, 2023
1 parent bf8da15 commit e1b6cb4
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 34 deletions.
14 changes: 9 additions & 5 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8970,13 +8970,15 @@ public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
* @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer)
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
@Experimental
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
Expand Down Expand Up @@ -17640,7 +17642,7 @@ public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit uni
/**
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
* {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler, Consumer)}).
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
* will be emitted by the resulting {@code Flowable}.
Expand Down Expand Up @@ -17668,13 +17670,15 @@ public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit uni
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #debounce(long, TimeUnit, Scheduler)
* @see #debounce(long, TimeUnit, Scheduler, Consumer)
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
@Experimental
public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
return debounce(timeout, unit, scheduler, onDropped);
}

Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7933,12 +7933,14 @@ public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNu
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} } or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
* @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer)
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
@Experimental
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
Expand Down Expand Up @@ -14671,12 +14673,14 @@ public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit u
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see #debounce(long, TimeUnit, Scheduler)
* @see #debounce(long, TimeUnit, Scheduler, Consumer)
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
@Experimental
public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<? super T> onDropped) {
return debounce(timeout, unit, scheduler, onDropped);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public final class FlowableDebounceTimed<T> extends AbstractFlowableWithUpstream
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;

public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
Expand All @@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;

Subscription upstream;

Expand All @@ -68,7 +68,7 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong

boolean done;

DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<? super T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
Expand All @@ -93,14 +93,14 @@ public void onNext(T t) {
long idx = index + 1;
index = idx;

Disposable d = timer;
if (d != null) {
d.dispose();
DebounceEmitter<T> currentEmitter = timer;
if (currentEmitter != null) {
currentEmitter.dispose();
}

if (onDropped != null && timer != null) {
if (onDropped != null && currentEmitter != null) {
try {
onDropped.accept(timer.value);
onDropped.accept(currentEmitter.value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
Expand All @@ -110,10 +110,9 @@ public void onNext(T t) {
}
}

DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
timer = de;
d = worker.schedule(de, timeout, unit);
de.setResource(d);
DebounceEmitter<T> newEmitter = new DebounceEmitter<>(t, idx, this);
timer = newEmitter;
newEmitter.setResource(worker.schedule(newEmitter, timeout, unit));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public final class ObservableDebounceTimed<T> extends AbstractObservableWithUpst
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;

public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<? super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
Expand All @@ -51,7 +51,7 @@ static final class DebounceTimedObserver<T>
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final Consumer<T> onDropped;
final Consumer<? super T> onDropped;

Disposable upstream;

Expand All @@ -61,7 +61,7 @@ static final class DebounceTimedObserver<T>

boolean done;

DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<? super T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
Expand All @@ -85,12 +85,12 @@ public void onNext(T t) {
long idx = index + 1;
index = idx;

Disposable d = timer;
if (d != null) {
d.dispose();
DebounceEmitter<T> currentEmitter = timer;
if (currentEmitter != null) {
currentEmitter.dispose();
}

if (onDropped != null && timer != null) {
if (onDropped != null && currentEmitter != null) {
try {
onDropped.accept(timer.value);
} catch (Throwable ex) {
Expand All @@ -101,10 +101,9 @@ public void onNext(T t) {
}
}

DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
timer = de;
d = worker.schedule(de, timeout, unit);
de.setResource(d);
DebounceEmitter<T> newEmitter = new DebounceEmitter<>(t, idx, this);
timer = newEmitter;
newEmitter.setResource(worker.schedule(newEmitter, timeout, unit));
}

@Override
Expand Down

0 comments on commit e1b6cb4

Please sign in to comment.