Skip to content

Commit

Permalink
fixes breaking change for fromFuture source cancellation (#3252)
Browse files Browse the repository at this point in the history
this commits fixes misleading documentation introduced after #3146. Also, this commit adds extra methods overloads which allow suppression of the cancellation propagation to the observed `Future`
  • Loading branch information
OlegDokuka committed Oct 28, 2022
1 parent b70f94e commit 7068604
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 41 deletions.
70 changes: 52 additions & 18 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -537,18 +537,17 @@ public static <T> Mono<T> fromCallable(Callable<? extends T> supplier) {
* <p>
* <img class="marble" src="doc-files/marbles/fromFuture.svg" alt="">
* <p>
* Note that the completion stage is not cancelled when that Mono is cancelled, but
* that behavior can be obtained by using {@link #doFinally(Consumer)} that checks
* for a {@link SignalType#CANCEL} and calls eg.
* {@link CompletionStage#toCompletableFuture() .toCompletableFuture().cancel(false)}.
* Note, use {@link #fromFuture(CompletableFuture, boolean)} with {@code
* suppressCancellation} set to {@code true} if you need to suppress cancellation
* propagation
*
* @param completionStage {@link CompletionStage} that will produce a value (or a null to
* complete immediately)
* @param <T> type of the expected value
* @return A {@link Mono}.
*/
public static <T> Mono<T> fromCompletionStage(CompletionStage<? extends T> completionStage) {
return onAssembly(new MonoCompletionStage<>(completionStage));
return onAssembly(new MonoCompletionStage<>(completionStage, false));
}

/**
Expand All @@ -558,18 +557,17 @@ public static <T> Mono<T> fromCompletionStage(CompletionStage<? extends T> compl
* <p>
* <img class="marble" src="doc-files/marbles/fromFutureSupplier.svg" alt="">
* <p>
* Note that the completion stage is not cancelled when that Mono is cancelled, but
* that behavior can be obtained by using {@link #doFinally(Consumer)} that checks
* for a {@link SignalType#CANCEL} and calls eg.
* {@link CompletionStage#toCompletableFuture() .toCompletableFuture().cancel(false)}.
* Note, use {@link #fromFuture(CompletableFuture, boolean)} with {@code
* suppressCancellation} set to {@code true} if you need to suppress cancellation
* propagation
*
* @param stageSupplier The {@link Supplier} of a {@link CompletionStage} that will produce a value (or a null to
* complete immediately). This allows lazy triggering of CompletionStage-based APIs.
* @param <T> type of the expected value
* @return A {@link Mono}.
*/
public static <T> Mono<T> fromCompletionStage(Supplier<? extends CompletionStage<? extends T>> stageSupplier) {
return defer(() -> onAssembly(new MonoCompletionStage<>(stageSupplier.get())));
return defer(() -> onAssembly(new MonoCompletionStage<>(stageSupplier.get(), false)));
}

/**
Expand Down Expand Up @@ -616,9 +614,9 @@ public static <I> Mono<I> fromDirect(Publisher<? extends I> source){
* <p>
* <img class="marble" src="doc-files/marbles/fromFuture.svg" alt="">
* <p>
* Note that the future is not cancelled when that Mono is cancelled, but that behavior
* can be obtained by using a {@link #doFinally(Consumer)} that checks
* for a {@link SignalType#CANCEL} and calls {@link CompletableFuture#cancel(boolean)}.
* Note, use {@link #fromFuture(CompletableFuture, boolean)} with {@code
* suppressCancellation} set to {@code true} if you need to suppress cancellation
* propagation
*
* @param future {@link CompletableFuture} that will produce a value (or a null to
* complete immediately)
Expand All @@ -627,7 +625,26 @@ public static <I> Mono<I> fromDirect(Publisher<? extends I> source){
* @see #fromCompletionStage(CompletionStage) fromCompletionStage for a generalization
*/
public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future) {
return onAssembly(new MonoCompletionStage<>(future));
return fromFuture(future, false);
}

/**
* Create a {@link Mono}, producing its value using the provided {@link CompletableFuture}.
*
* <p>
* <img class="marble" src="doc-files/marbles/fromFuture.svg" alt="">
* <p>
*
* @param future {@link CompletableFuture} that will produce a value (or a null to
* complete immediately)
* @param suppressCancel specifies whether future should have cancellation signal
* to be suppressed
* @param <T> type of the expected value
* @return A {@link Mono}.
* @see #fromCompletionStage(CompletionStage) fromCompletionStage for a generalization
*/
public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future, boolean suppressCancel) {
return onAssembly(new MonoCompletionStage<>(future, suppressCancel));
}

/**
Expand All @@ -637,9 +654,6 @@ public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future) {
* <p>
* <img class="marble" src="doc-files/marbles/fromFutureSupplier.svg" alt="">
* <p>
* Note that the future is not cancelled when that Mono is cancelled, but that behavior
* can be obtained by using a {@link #doFinally(Consumer)} that checks
* for a {@link SignalType#CANCEL} and calls {@link CompletableFuture#cancel(boolean)}.
*
* @param futureSupplier The {@link Supplier} of a {@link CompletableFuture} that will produce a value (or a null to
* complete immediately). This allows lazy triggering of future-based APIs.
Expand All @@ -648,7 +662,27 @@ public static <T> Mono<T> fromFuture(CompletableFuture<? extends T> future) {
* @see #fromCompletionStage(Supplier) fromCompletionStage for a generalization
*/
public static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier) {
return defer(() -> onAssembly(new MonoCompletionStage<>(futureSupplier.get())));
return fromFuture(futureSupplier, false);
}

/**
* Create a {@link Mono} that wraps a {@link CompletableFuture} on subscription,
* emitting the value produced by the Future.
*
* <p>
* <img class="marble" src="doc-files/marbles/fromFutureSupplier.svg" alt="">
* <p>
*
* @param futureSupplier The {@link Supplier} of a {@link CompletableFuture} that will produce a value (or a null to
* complete immediately). This allows lazy triggering of future-based APIs.
* @param suppressCancel specifies whether future should have cancellation signal
* to be suppressed
* @param <T> type of the expected value
* @return A {@link Mono}.
* @see #fromCompletionStage(Supplier) fromCompletionStage for a generalization
*/
public static <T> Mono<T> fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier, boolean suppressCancel) {
return defer(() -> onAssembly(new MonoCompletionStage<>(futureSupplier.get(), suppressCancel)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,27 @@ final class MonoCompletionStage<T> extends Mono<T>
implements Fuseable, Scannable {

final CompletionStage<? extends T> future;
final boolean suppressCancellation;

MonoCompletionStage(CompletionStage<? extends T> future) {
MonoCompletionStage(CompletionStage<? extends T> future, boolean suppressCancellation) {
this.future = Objects.requireNonNull(future, "future");
this.suppressCancellation = suppressCancellation;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<T, T>(actual) {
@Override
public void cancel() {
super.cancel();
if (future instanceof Future) {
((Future<?>) future).cancel(true);
}
}
};
sds = suppressCancellation
? new Operators.MonoSubscriber<>(actual)
: new Operators.MonoSubscriber<T, T>(actual) {
@Override
public void cancel() {
super.cancel();
if (future instanceof Future) {
((Future<?>) future).cancel(true);
}
}
};

actual.onSubscribe(sds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ public void propagateCancellationToCompletionFuture() {
assertThat(future).isCancelled();
}

@Test
public void cancelThenFutureFailsWithDroppedError() {
CompletableFuture<Integer> future = new CompletableFuture<>();
AtomicReference<Subscription> subRef = new AtomicReference<>();

Mono<Integer> mono = Mono
.fromFuture(future, true)
.doOnSubscribe(subRef::set);

StepVerifier.create(mono)
.expectSubscription()
.then(() -> {
subRef.get().cancel();
future.completeExceptionally(new IllegalStateException("boom"));
future.complete(1);
})
.thenCancel()//already cancelled but need to get to verification
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom");
}

@Test
public void cancelThenFutureFails() {
CompletableFuture<Integer> future = new CompletableFuture<>();
Expand All @@ -75,10 +96,7 @@ public void cancelFutureImmediatelyCancelledLoop() {
for (int i = 0; i < 10000; i++) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Mono<Integer> mono = Mono
.fromFuture(future)
.doFinally(sig -> {
if (sig == SignalType.CANCEL) future.cancel(false);
});
.fromFuture(future);

StepVerifier.create(mono)
.expectSubscription()
Expand All @@ -96,10 +114,7 @@ public void cancelFutureDelayedCancelledLoop() {
for (int i = 0; i < 500; i++) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Mono<Integer> mono = Mono
.fromFuture(future)
.doFinally(sig -> {
if (sig == SignalType.CANCEL) future.cancel(false);
});
.fromFuture(future);

StepVerifier.create(mono)
.expectSubscription()
Expand All @@ -118,10 +133,7 @@ public void cancelFutureTimeoutCancelledLoop() {
for (int i = 0; i < 500; i++) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Mono<Integer> mono = Mono
.fromFuture(future)
.doFinally(sig -> {
if (sig == SignalType.CANCEL) future.cancel(false);
});
.fromFuture(future);

StepVerifier.create(mono.timeout(Duration.ofMillis(10)))
.expectSubscription()
Expand Down Expand Up @@ -171,7 +183,7 @@ public void stackOverflowGoesToOnErrorDropped() {
@Test
public void scanOperator(){
CompletionStage<String> completionStage = CompletableFuture.supplyAsync(() -> "helloFuture");
MonoCompletionStage<String> test = new MonoCompletionStage<>(completionStage);
MonoCompletionStage<String> test = new MonoCompletionStage<>(completionStage, false);

assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.ASYNC);
assertThat(test.scan(Scannable.Attr.ACTUAL)).isNull();
Expand Down

0 comments on commit 7068604

Please sign in to comment.