Skip to content

Commit

Permalink
3.x: Standardize MissingBackpressureException message, introduce Queu…
Browse files Browse the repository at this point in the history
…eOverflowException (#7459)

* 3.x: Standardize MissingBackpressureException messages

* Use the correct message.

* Fix tests

* Add QueueOverflowException, fix tests

* Update CompletableConcatTest.java
  • Loading branch information
akarnokd committed Aug 8, 2022
1 parent ef51a90 commit 2edba23
Show file tree
Hide file tree
Showing 61 changed files with 164 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ public final class MissingBackpressureException extends RuntimeException {

private static final long serialVersionUID = 8517344746016032542L;

/**
* The default error message.
* <p>
* This can happen if the downstream doesn't call {@link org.reactivestreams.Subscription#request(long)}
* in time or at all.
* @since 3.1.6
*/
public static final String DEFAULT_MESSAGE = "Could not emit value due to lack of requests";

/**
* Constructs a MissingBackpressureException without message or cause.
*/
Expand All @@ -35,4 +44,13 @@ public MissingBackpressureException(String message) {
super(message);
}

/**
* Constructs a new {@code MissingBackpressureException} with the
* default message {@value #DEFAULT_MESSAGE}.
* @return the new {@code MissingBackpressureException} instance.
* @since 3.1.6
*/
public static MissingBackpressureException createDefault() {
return new MissingBackpressureException(DEFAULT_MESSAGE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.exceptions;

/**
* Indicates an overflow happened because the upstream disregarded backpressure completely or
* {@link org.reactivestreams.Subscriber#onNext(Object)} was called concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside an operator.
* @since 3.1.6
*/
public final class QueueOverflowException extends RuntimeException {

private static final long serialVersionUID = 8517344746016032542L;

/**
* The message for queue overflows.
* <p>
* This can happen if the upstream disregards backpressure completely or calls
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
* without synchronization. Rarely, it is an indication of bugs inside an operator.
*/
private static final String DEFAULT_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in an operator";

/**
* Constructs a QueueOverflowException with the default message.
*/
public QueueOverflowException() {
this(DEFAULT_MESSAGE);
}

/**
* Constructs a QueueOverflowException with the given message but no cause.
* @param message the error message
*/
public QueueOverflowException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void onNext(T t) {
if (sourceMode != QueueFuseable.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new MissingBackpressureException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onSubscribe(Subscription s) {
public void onNext(CompletableSource t) {
if (sourceFused == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onNext(T t) {
if (!queue.offer(t)) {
SubscriptionHelper.cancel(this);

onError(new MissingBackpressureException("Queue full?!"));
onError(new QueueOverflowException());
} else {
signalConsumer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.internal.util.*;
Expand Down Expand Up @@ -152,7 +152,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new IllegalStateException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void innerNext(InnerQueuedSubscriber<R> inner, R value) {
drain();
} else {
inner.cancel();
innerError(inner, new MissingBackpressureException());
innerError(inner, MissingBackpressureException.createDefault());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Expand Down Expand Up @@ -151,7 +151,7 @@ public final void onNext(T t) {
if (sourceMode != QueueSubscription.ASYNC) {
if (!queue.offer(t)) {
upstream.cancel();
onError(new IllegalStateException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
onError(new MissingBackpressureException("create: " + MissingBackpressureException.DEFAULT_MESSAGE));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void emit(long idx, T value) {
BackpressureHelper.produced(this, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void emit(long idx, T t, DebounceEmitter<T> emitter) {
emitter.dispose();
} else {
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void tryEmitScalar(U value) {
q = getMainQueue();
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Scalar queue full?!"));
onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -252,7 +252,7 @@ void tryEmitScalar(U value) {
} else {
SimpleQueue<U> q = getMainQueue();
if (!q.offer(value)) {
onError(new MissingBackpressureException("Scalar queue full?!"));
onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
Expand All @@ -278,7 +278,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Inner queue full?!"));
onError(new QueueOverflowException());
}
}
if (decrementAndGet() == 0) {
Expand All @@ -291,7 +291,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
inner.queue = q;
}
if (!q.offer(value)) {
onError(new MissingBackpressureException("Inner queue full?!"));
onError(new QueueOverflowException());
return;
}
if (getAndIncrement() != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void onNext(T t) {
return;
}
if (fusionMode == NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue is full?!"));
onError(new QueueOverflowException());
return;
}
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void onNext(T t) {
if (emittedGroups != get()) {
downstream.onNext(group);
} else {
MissingBackpressureException mbe = new MissingBackpressureException(groupHangWarning(emittedGroups));
MissingBackpressureException mbe = groupHangWarning(emittedGroups);
mbe.initCause(ex);
onError(mbe);
return;
Expand All @@ -194,13 +194,13 @@ public void onNext(T t) {
}
} else {
upstream.cancel();
onError(new MissingBackpressureException(groupHangWarning(emittedGroups)));
onError(groupHangWarning(emittedGroups));
}
}
}

static String groupHangWarning(long n) {
return "Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.";
static MissingBackpressureException groupHangWarning(long n) {
return new MissingBackpressureException("Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void drain() {
a.onNext(w);
BackpressureHelper.produced(requested, 1);
} else {
fail(new MissingBackpressureException("Could not emit value due to lack of requests"), a, q);
fail(MissingBackpressureException.createDefault(), a, q);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void run() {
downstream.onNext(count++);
BackpressureHelper.produced(this, 1);
} else {
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void run() {
decrementAndGet();
}
} else {
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void drain() {

e++;
} else {
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
Expand Down Expand Up @@ -321,7 +321,7 @@ else if (mode == RIGHT_VALUE) {

e++;
} else {
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
q.clear();
cancelAll();
errorAll(a);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final void onNext(T t) {
if (!queue.offer(t)) {
upstream.cancel();

error = new MissingBackpressureException("Queue is full?!");
error = new QueueOverflowException();
done = true;
}
trySchedule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void onNext(T t) {
}
} else if (callError) {
upstream.cancel();
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
} else {
drain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onNext(T t) {
BackpressureHelper.produced(this, 1);
} else {
upstream.cancel();
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
onError(MissingBackpressureException.createDefault());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
// we expect upstream to honor backpressure requests
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Prefetch queue is full?!"));
onError(new QueueOverflowException());
return;
}
// since many things can happen concurrently, we have a common dispatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void onNext(T t) {
}
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
upstream.get().cancel();
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
return;
}
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void emit() {
BackpressureHelper.produced(requested, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (sourceMode == QueueSubscription.NONE) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
onError(MissingBackpressureException.createDefault());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void onNext(R t) {
SwitchMapSubscriber<T, R> p = parent;
if (index == p.unique) {
if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) {
onError(new MissingBackpressureException("Queue full?!"));
onError(new QueueOverflowException());
return;
}
p.drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void onNext(T t) {
} else {
done = true;
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void run() {
downstream.onComplete();
} else {
lazySet(EmptyDisposable.INSTANCE);
downstream.onError(new MissingBackpressureException("Can't deliver value due to lack of requests"));
downstream.onError(MissingBackpressureException.createDefault());
}
}
}
Expand Down

0 comments on commit 2edba23

Please sign in to comment.