Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Standardize MissingBackpressureException message, introduce QueueOverflowException #7459

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ public final class MissingBackpressureException extends RuntimeException {

private static final long serialVersionUID = 8517344746016032542L;

/**
* The default error message.
* <p>
* This can happen if the downstream doesn't call {@link org.reactivestreams.Subscription#request(long)}
* in time or at all.
* @since 3.1.6
*/
public static final String DEFAULT_MESSAGE = "Could not emit value due to lack of requests";

/**
* Constructs a MissingBackpressureException without message or cause.
*/
Expand All @@ -35,4 +44,13 @@ public MissingBackpressureException(String message) {
super(message);
}

/**
* Constructs a new {@code MissingBackpressureException} with the
* default message {@value #DEFAULT_MESSAGE}.
* @return the new {@code MissingBackpressureException} instance.
* @since 3.1.6
*/
public static MissingBackpressureException createDefault() {
return new MissingBackpressureException(DEFAULT_MESSAGE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.exceptions;

/**
* Indicates an overflow happened because the upstream disregarded backpressure completely or
* {@link org.reactivestreams.Subscriber#onNext(Object)} was called concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside an operator.
* @since 3.1.6
*/
public final class QueueOverflowException extends RuntimeException {

private static final long serialVersionUID = 8517344746016032542L;

/**
* The message for queue overflows.
* <p>
* This can happen if the upstream disregards backpressure completely or calls
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside an operator.
*/
private static final String DEFAULT_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in an operator";

/**
* Constructs a QueueOverflowException with the default message.
*/
public QueueOverflowException() {
this(DEFAULT_MESSAGE);
}

/**
* Constructs a QueueOverflowException with the given message but no cause.
* @param message the error message
*/
public QueueOverflowException(String message) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case 3rd party users would want to have a custom message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright!

super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void onNext(T t) {
if (sourceMode != QueueFuseable.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new MissingBackpressureException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onSubscribe(Subscription s) {
public void onNext(CompletableSource t) {
if (sourceFused == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onNext(T t) {
if (!queue.offer(t)) {
SubscriptionHelper.cancel(this);

onError(new MissingBackpressureException("Queue full?!"));
onError(new QueueOverflowException());
} else {
signalConsumer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.internal.util.*;
Expand Down Expand Up @@ -152,7 +152,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new IllegalStateException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void innerNext(InnerQueuedSubscriber<R> inner, R value) {
drain();
} else {
inner.cancel();
innerError(inner, new MissingBackpressureException());
innerError(inner, MissingBackpressureException.createDefault());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Expand Down Expand Up @@ -151,7 +151,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new IllegalStateException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
onError(new MissingBackpressureException("create: " + MissingBackpressureException.DEFAULT_MESSAGE));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void emit(long idx, T value) {
BackpressureHelper.produced(this, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void emit(long idx, T t, DebounceEmitter<T> emitter) {
emitter.dispose();
} else {
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void tryEmitScalar(U value) {
q = getMainQueue();
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Scalar queue full?!"));
onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -252,7 +252,7 @@ void tryEmitScalar(U value) {
} else {
SimpleQueue<U> q = getMainQueue();
if (!q.offer(value)) {
onError(new MissingBackpressureException("Scalar queue full?!"));
onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
Expand All @@ -278,7 +278,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Inner queue full?!"));
onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -291,7 +291,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Inner queue full?!"));
onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void onNext(T t) {
return;
}
if (fusionMode == NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue is full?!"));
onError(new QueueOverflowException());
return;
}
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void onNext(T t) {
if (emittedGroups != get()) {
downstream.onNext(group);
} else {
MissingBackpressureException mbe = new MissingBackpressureException(groupHangWarning(emittedGroups));
MissingBackpressureException mbe = groupHangWarning(emittedGroups);
mbe.initCause(ex);
onError(mbe);
return;
Expand All @@ -194,13 +194,13 @@ public void onNext(T t) {
}
} else {
upstream.cancel();
onError(new MissingBackpressureException(groupHangWarning(emittedGroups)));
onError(groupHangWarning(emittedGroups));
}
}
}

static String groupHangWarning(long n) {
return "Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.";
static MissingBackpressureException groupHangWarning(long n) {
return new MissingBackpressureException("Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void drain() {
a.onNext(w);
BackpressureHelper.produced(requested, 1);
} else {
fail(new MissingBackpressureException("Could not emit value due to lack of requests"), a, q);
fail(MissingBackpressureException.createDefault(), a, q);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void run() {
downstream.onNext(count++);
BackpressureHelper.produced(this, 1);
} else {
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void run() {
decrementAndGet();
}
} else {
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void drain() {

e++;
} else {
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
Expand Down Expand Up @@ -321,7 +321,7 @@ else if (mode == RIGHT_VALUE) {

e++;
} else {
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final void onNext(T t) {
if (!queue.offer(t)) {
upstream.cancel();

error = new MissingBackpressureException("Queue is full?!");
error = new QueueOverflowException();
done = true;
}
trySchedule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onNext(T t) {
}
} else if (callError) {
upstream.cancel();
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
} else {
drain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onNext(T t) {
BackpressureHelper.produced(this, 1);
} else {
upstream.cancel();
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
onError(MissingBackpressureException.createDefault());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
// we expect upstream to honor backpressure requests
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Prefetch queue is full?!"));
onError(new QueueOverflowException());
return;
}
// since many things can happen concurrently, we have a common dispatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void onNext(T t) {
}
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
upstream.get().cancel();
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
return;
}
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (sourceMode == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void onNext(R t) {
SwitchMapSubscriber<T, R> p = parent;
if (index == p.unique) {
if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
p.drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void onNext(T t) {
} else {
done = true;
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void run() {
downstream.onComplete();
} else {
lazySet(EmptyDisposable.INSTANCE);
downstream.onError(new MissingBackpressureException("Can't deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down