Skip to content

Commit

Permalink
Align hasCustomOnError behavior of CallbackCompletableObserver with L…
Browse files Browse the repository at this point in the history
…ambdaObserver, ConsumerSingleObserver and so on (#7326)

Co-authored-by: zhuwenbo <zhuwenbo@bilibili.com>
  • Loading branch information
hqzxzwb and zhuwenbo committed Aug 26, 2021
1 parent f5ff589 commit 939b5ce
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 18 deletions.
6 changes: 1 addition & 5 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Expand Up @@ -2996,11 +2996,7 @@ public final Disposable subscribe(
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(@NonNull Action onComplete) {
Objects.requireNonNull(onComplete, "onComplete is null");

CallbackCompletableObserver observer = new CallbackCompletableObserver(onComplete);
subscribe(observer);
return observer;
return subscribe(onComplete, Functions.ON_ERROR_MISSING);
}

/**
Expand Down
Expand Up @@ -20,33 +20,24 @@
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class CallbackCompletableObserver
extends AtomicReference<Disposable>
implements CompletableObserver, Disposable, Consumer<Throwable>, LambdaConsumerIntrospection {
implements CompletableObserver, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -4361286194466301354L;

final Consumer<? super Throwable> onError;
final Action onComplete;

public CallbackCompletableObserver(Action onComplete) {
this.onError = this;
this.onComplete = onComplete;
}

public CallbackCompletableObserver(Consumer<? super Throwable> onError, Action onComplete) {
this.onError = onError;
this.onComplete = onComplete;
}

@Override
public void accept(Throwable e) {
RxJavaPlugins.onError(new OnErrorNotImplementedException(e));
}

@Override
public void onComplete() {
try {
Expand Down Expand Up @@ -86,6 +77,6 @@ public boolean isDisposed() {

@Override
public boolean hasCustomOnError() {
return onError != this;
return onError != Functions.ON_ERROR_MISSING;
}
}
Expand Up @@ -24,7 +24,7 @@ public final class CallbackCompletableObserverTest extends RxJavaTest {

@Test
public void emptyActionShouldReportNoCustomOnError() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION);
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);

assertFalse(o.hasCustomOnError());
}
Expand Down

0 comments on commit 939b5ce

Please sign in to comment.