Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Standardize MissingBackpressureException message, introduce QueueOverflowException #7459

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@ public final class MissingBackpressureException extends RuntimeException {

private static final long serialVersionUID = 8517344746016032542L;

/**
* The default error message.
* <p>
* This can happen if the downstream doesn't call {@link org.reactivestreams.Subscription#request(long)}
* in time or at all.
* @since 3.1.6
*/
public static final String DEFAULT_MESSAGE = "Could not emit value due to lack of requests";

/**
* The message for queue overflows.
* <p>
* This can happen if the upstream disregards backpressure completely or calls
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside RxJava
* @since 3.1.6
*/
public static final String QUEUE_OVERFLOW_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in RxJava";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could be private though, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3rd party implementations should be able to verify they got this exact message:

assertEquals(ex.getMessage(), MissingBackpressureException.QUEUE_OVERFLOW_MESSAGE);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm should not we then just have two different exceptions both having only one public constructor which contains the exact message and then you simply check for its type which is superior than the String message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, let's introduce a separate exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/**
* Constructs a MissingBackpressureException without message or cause.
*/
Expand All @@ -35,4 +54,23 @@ public MissingBackpressureException(String message) {
super(message);
}

/**
* Constructs a new {@code MissingBackpressureException} with the
* default message {@value #DEFAULT_MESSAGE}.
* @return the new {@code MissingBackpressureException} instance.
* @since 3.1.6
*/
public static MissingBackpressureException createDefault() {
return new MissingBackpressureException(DEFAULT_MESSAGE);
}

/**
* Constructs a new {@code MissingBackpressureException} with the
* default message {@value #QUEUE_OVERFLOW_MESSAGE}.
* @return the new {@code MissingBackpressureException} instance.
* @since 3.1.6
*/
public static MissingBackpressureException createQueueOverflow() {
return new MissingBackpressureException(QUEUE_OVERFLOW_MESSAGE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void onNext(T t) {
if (sourceMode != QueueFuseable.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new MissingBackpressureException("Queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onSubscribe(Subscription s) {
public void onNext(CompletableSource t) {
if (sourceFused == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createQueueOverflow());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onNext(T t) {
if (!queue.offer(t)) {
SubscriptionHelper.cancel(this);

onError(new MissingBackpressureException("Queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
} else {
signalConsumer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.internal.util.*;
Expand Down Expand Up @@ -152,7 +152,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new IllegalStateException("Queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void innerNext(InnerQueuedSubscriber<R> inner, R value) {
drain();
} else {
inner.cancel();
innerError(inner, new MissingBackpressureException());
innerError(inner, MissingBackpressureException.createDefault());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Expand Down Expand Up @@ -151,7 +151,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new IllegalStateException("Queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
onError(new MissingBackpressureException("create: " + MissingBackpressureException.DEFAULT_MESSAGE));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void emit(long idx, T value) {
BackpressureHelper.produced(this, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void emit(long idx, T t, DebounceEmitter<T> emitter) {
emitter.dispose();
} else {
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void tryEmitScalar(U value) {
q = getMainQueue();
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Scalar queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -252,7 +252,7 @@ void tryEmitScalar(U value) {
} else {
SimpleQueue<U> q = getMainQueue();
if (!q.offer(value)) {
onError(new MissingBackpressureException("Scalar queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
if (getAndIncrement() != 0) {
Expand All @@ -278,7 +278,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Inner queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -291,7 +291,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Inner queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
if (getAndIncrement() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void onNext(T t) {
return;
}
if (fusionMode == NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue is full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void onNext(T t) {
if (emittedGroups != get()) {
downstream.onNext(group);
} else {
MissingBackpressureException mbe = new MissingBackpressureException(groupHangWarning(emittedGroups));
MissingBackpressureException mbe = groupHangWarning(emittedGroups);
mbe.initCause(ex);
onError(mbe);
return;
Expand All @@ -194,13 +194,13 @@ public void onNext(T t) {
}
} else {
upstream.cancel();
onError(new MissingBackpressureException(groupHangWarning(emittedGroups)));
onError(groupHangWarning(emittedGroups));
}
}
}

static String groupHangWarning(long n) {
return "Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.";
static MissingBackpressureException groupHangWarning(long n) {
return new MissingBackpressureException("Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void drain() {
a.onNext(w);
BackpressureHelper.produced(requested, 1);
} else {
fail(new MissingBackpressureException("Could not emit value due to lack of requests"), a, q);
fail(MissingBackpressureException.createDefault(), a, q);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void run() {
downstream.onNext(count++);
BackpressureHelper.produced(this, 1);
} else {
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void run() {
decrementAndGet();
}
} else {
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void drain() {

e++;
} else {
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
Expand Down Expand Up @@ -321,7 +321,7 @@ else if (mode == RIGHT_VALUE) {

e++;
} else {
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final void onNext(T t) {
if (!queue.offer(t)) {
upstream.cancel();

error = new MissingBackpressureException("Queue is full?!");
error = MissingBackpressureException.createQueueOverflow();
done = true;
}
trySchedule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onNext(T t) {
}
} else if (callError) {
upstream.cancel();
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
} else {
drain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onNext(T t) {
BackpressureHelper.produced(this, 1);
} else {
upstream.cancel();
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
onError(MissingBackpressureException.createDefault());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
// we expect upstream to honor backpressure requests
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Prefetch queue is full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
// since many things can happen concurrently, we have a common dispatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void onNext(T t) {
}
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
upstream.get().cancel();
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
return;
}
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (sourceMode == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void onNext(R t) {
SwitchMapSubscriber<T, R> p = parent;
if (index == p.unique) {
if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue full?!"));
onError(MissingBackpressureException.createQueueOverflow());
return;
}
p.drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void onNext(T t) {
} else {
done = true;
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void run() {
downstream.onComplete();
} else {
lazySet(EmptyDisposable.INSTANCE);
downstream.onError(new MissingBackpressureException("Can't deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ void drain() {
} else {
SubscriptionHelper.cancel(upstream);
boundarySubscriber.dispose();
errors.tryAddThrowableOrReport(new MissingBackpressureException("Could not deliver a window due to lack of requests"));
errors.tryAddThrowableOrReport(MissingBackpressureException.createDefault());
done = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ void drain() {
upstream.cancel();
startSubscriber.cancel();
resources.dispose();
error.tryAddThrowableOrReport(new MissingBackpressureException(FlowableWindowTimed.missingBackpressureMessage(emitted)));
error.tryAddThrowableOrReport(FlowableWindowTimed.missingBackpressureMessage(emitted));
upstreamDone = true;
}
}
Expand Down