Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Standardize MissingBackpressureException message, introduce QueueOverflowException #7459

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@ public final class MissingBackpressureException extends RuntimeException {
*/
public static final String DEFAULT_MESSAGE = "Could not emit value due to lack of requests";

/**
* The message for queue overflows.
* <p>
* This can happen if the upstream disregards backpressure completely or calls
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside RxJava
* @since 3.1.6
*/
public static final String QUEUE_OVERFLOW_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in RxJava";

/**
* Constructs a MissingBackpressureException without message or cause.
*/
Expand All @@ -63,14 +53,4 @@ public MissingBackpressureException(String message) {
public static MissingBackpressureException createDefault() {
return new MissingBackpressureException(DEFAULT_MESSAGE);
}

/**
* Constructs a new {@code MissingBackpressureException} with the
* default message {@value #QUEUE_OVERFLOW_MESSAGE}.
* @return the new {@code MissingBackpressureException} instance.
* @since 3.1.6
*/
public static MissingBackpressureException createQueueOverflow() {
return new MissingBackpressureException(QUEUE_OVERFLOW_MESSAGE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.exceptions;

/**
* Indicates an overflow happened because the upstream disregarded backpressure completely or
* {@link org.reactivestreams.Subscriber#onNext(Object)} was called concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside an operator.
* @since 3.1.6
*/
public final class QueueOverflowException extends RuntimeException {

private static final long serialVersionUID = 8517344746016032542L;

/**
* The message for queue overflows.
* <p>
* This can happen if the upstream disregards backpressure completely or calls
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside an operator.
*/
private static final String DEFAULT_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in an operator";

/**
* Constructs a QueueOverflowException with the default message.
*/
public QueueOverflowException() {
this(DEFAULT_MESSAGE);
}

/**
* Constructs a QueueOverflowException with the given message but no cause.
* @param message the error message
*/
public QueueOverflowException(String message) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case 3rd party users would want to have a custom message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright!

super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void onNext(T t) {
if (sourceMode != QueueFuseable.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onSubscribe(Subscription s) {
public void onNext(CompletableSource t) {
if (sourceFused == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onNext(T t) {
if (!queue.offer(t)) {
SubscriptionHelper.cancel(this);

onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
} else {
signalConsumer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void tryEmitScalar(U value) {
q = getMainQueue();
}
if (!q.offer(value)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -252,7 +252,7 @@ void tryEmitScalar(U value) {
} else {
SimpleQueue<U> q = getMainQueue();
if (!q.offer(value)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
Expand All @@ -278,7 +278,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -291,7 +291,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void onNext(T t) {
return;
}
if (fusionMode == NONE && !queue.offer(t)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final void onNext(T t) {
if (!queue.offer(t)) {
upstream.cancel();

error = MissingBackpressureException.createQueueOverflow();
error = new QueueOverflowException();
done = true;
}
trySchedule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
// we expect upstream to honor backpressure requests
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
// since many things can happen concurrently, we have a common dispatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void onNext(R t) {
SwitchMapSubscriber<T, R> p = parent;
if (index == p.unique) {
if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
p.drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.reactivestreams.Subscription;

import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.operators.QueueFuseable;
Expand Down Expand Up @@ -99,7 +99,7 @@ public final void onNext(T t) {
if (t != null) {
if (!queue.offer(t)) {
upstream.cancel();
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void onNext(T t) {
if (sourceMode == QueueSubscription.NONE) {
if (!queue.offer(t)) {
upstream.cancel();
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.operators.SimplePlainQueue;
Expand Down Expand Up @@ -153,7 +153,7 @@ public void onNext(JoinInnerSubscriber<T> inner, T value) {

if (!q.offer(value)) {
cancelAll();
Throwable mbe = MissingBackpressureException.createQueueOverflow();
Throwable mbe = new QueueOverflowException();
if (errors.compareAndSet(null, mbe)) {
downstream.onError(mbe);
} else {
Expand All @@ -170,7 +170,7 @@ public void onNext(JoinInnerSubscriber<T> inner, T value) {

if (!q.offer(value)) {
cancelAll();
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}

Expand Down Expand Up @@ -333,7 +333,7 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {

if (!q.offer(value)) {
inner.cancel();
errors.tryAddThrowableOrReport(MissingBackpressureException.createQueueOverflow());
errors.tryAddThrowableOrReport(new QueueOverflowException());
done.decrementAndGet();
drainLoop();
return;
Expand All @@ -347,7 +347,7 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {

if (!q.offer(value)) {
inner.cancel();
errors.tryAddThrowableOrReport(MissingBackpressureException.createQueueOverflow());
errors.tryAddThrowableOrReport(new QueueOverflowException());
done.decrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.internal.schedulers.SchedulerMultiWorkerSupport;
import io.reactivex.rxjava3.internal.schedulers.SchedulerMultiWorkerSupport.WorkerCallback;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Expand Down Expand Up @@ -148,7 +148,7 @@ public final void onNext(T t) {
}
if (!queue.offer(t)) {
upstream.cancel();
onError(MissingBackpressureException.createQueueOverflow());
onError(new QueueOverflowException());
return;
}
schedule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,13 @@ static boolean isBug(Throwable error) {
return true;
}
// the sender didn't honor the request amount
// it's either due to an operator bug or concurrent onNext
if (error instanceof MissingBackpressureException) {
return true;
}
// it's either due to an operator bug or concurrent onNext
if (error instanceof QueueOverflowException) {
return true;
}
// general protocol violations
// it's either due to an operator bug or concurrent onNext
if (error instanceof IllegalStateException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.exceptions.QueueOverflowException;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
Expand Down Expand Up @@ -475,7 +475,7 @@ public Integer apply(Integer v) {
int vc = ts.values().size();
assertTrue("10 < " + vc, vc <= 10);

ts.assertError(MissingBackpressureException.class);
ts.assertError(QueueOverflowException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
}
.flatMapStream(v -> Stream.of(1, 2), 1)
.test(0)
.assertFailure(MissingBackpressureException.class);
.assertFailure(QueueOverflowException.class);

TestHelper.assertUndeliverable(errors, 0, TestException.class);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void subscribe(Subscriber<? super Completable> s) {
}), 1
)
.test()
.assertFailure(MissingBackpressureException.class);
.assertFailure(QueueOverflowException.class);

TestHelper.assertError(errors, 0, MissingBackpressureException.class);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void emptyThrowsNoSuch() {
it.next();
}

@Test(expected = MissingBackpressureException.class)
@Test(expected = QueueOverflowException.class)
public void overflowQueue() {
Iterator<Integer> it = new Flowable<Integer>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
}
.concatMap(Functions.justFunction(Flowable.just(2)), 8, ImmediateThinScheduler.INSTANCE)
.test(0L)
.assertFailure(MissingBackpressureException.class);
.assertFailure(QueueOverflowException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
}
.concatMap(Functions.justFunction(Flowable.just(2)), 8)
.test(0L)
.assertFailure(MissingBackpressureException.class);
.assertFailure(QueueOverflowException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1391,7 +1391,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
}
.flatMap(v -> Flowable.just(v), 1)
.test(0L)
.assertFailure(MissingBackpressureException.class);
.assertFailure(QueueOverflowException.class);
}

@Test
Expand All @@ -1413,7 +1413,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
}
})
.test()
.assertFailure(MissingBackpressureException.class, 1);
.assertFailure(QueueOverflowException.class, 1);
}

@Test
Expand All @@ -1430,7 +1430,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
}
}, false, 1, 1)
.test(0L)
.assertFailure(MissingBackpressureException.class);
.assertFailure(QueueOverflowException.class);
}

@Test
Expand Down