Skip to content

Commit

Permalink
Fix cancellation order in ThrottleFirst (ReactiveX#7484)
Browse files Browse the repository at this point in the history
* Fix cancellation order in ThrottleFirst

* Bump vanniktech maven publish

* Undo bump, seems to be gradle issue
  • Loading branch information
akarnokd authored and Desislav-Petrov committed Nov 26, 2022
1 parent 7a2792a commit 42b366d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ public void onNext(T t) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
upstream.cancel();
done = true;
cancel();
downstream.onError(MissingBackpressureException.createDefault());
worker.dispose();
return;
}

Expand All @@ -122,10 +123,10 @@ public void onNext(T t) {
onDropped.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
worker.dispose();
upstream.cancel();
done = true;
downstream.onError(ex);
worker.dispose();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class ObservableThrottleFirstTimed<T> extends AbstractObservableWit

public ObservableThrottleFirstTimed(
ObservableSource<T> source,
long timeout,
long timeout,
TimeUnit unit,
Scheduler scheduler,
Consumer<? super T> onDropped) {
Expand Down Expand Up @@ -102,9 +102,9 @@ public void onNext(T t) {
onDropped.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
downstream.onError(ex);
worker.dispose();
upstream.dispose();
}
}
}
Expand Down

0 comments on commit 42b366d

Please sign in to comment.