Skip to content

Commit

Permalink
Use FutureMono instead of Mono#fromCompletionStage (#2565)
Browse files Browse the repository at this point in the history
Reactor Netty does not need the type of cancellation
provided by `Mono#fromCompletionStage`

Related to #2560
  • Loading branch information
violetagg committed Nov 3, 2022
1 parent 9ff810e commit 1aca50f
Show file tree
Hide file tree
Showing 16 changed files with 513 additions and 80 deletions.
Expand Up @@ -123,7 +123,7 @@ default boolean isDisposed() {
* @return a {@link Mono} terminating with success if shutdown successfully or error
*/
default Mono<Void> onDispose() {
return Mono.fromCompletionStage(channel().closeFuture().asStage());
return FutureMono.from(channel().closeFuture());
}

/**
Expand Down
204 changes: 204 additions & 0 deletions reactor-netty5-core/src/main/java/reactor/netty5/FutureMono.java
@@ -0,0 +1,204 @@
/*
* Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5;

import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.function.Supplier;

import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureListener;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.netty5.channel.AbortedException;
import reactor.util.context.Context;

/**
* Convert Netty Future into void {@link Mono}.
*
* @author Stephane Maldini
*/
public abstract class FutureMono extends Mono<Void> {

/**
* Convert a {@link Future} into {@link Mono}. {@link Mono#subscribe(Subscriber)}
* will bridge to {@link Future#addListener(FutureListener)}.
*
* @param future the future to convert from
* @param <F> the future type
*
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> from(F future) {
Objects.requireNonNull(future, "future");
if (future.isDone()) {
if (!future.isSuccess()) {
return Mono.error(FutureSubscription.wrapError(future.cause()));
}
return Mono.empty();
}
return new ImmediateFutureMono<>(future);
}

/**
* Convert a supplied {@link Future} for each subscriber into {@link Mono}.
* {@link Mono#subscribe(Subscriber)}
* will bridge to {@link Future#addListener(FutureListener)}.
*
* @param deferredFuture the future to evaluate and convert from
* @param <F> the future type
*
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static <F extends Future<Void>> Mono<Void> deferFuture(Supplier<F> deferredFuture) {
return new DeferredFutureMono<>(deferredFuture);
}

final static class ImmediateFutureMono<F extends Future<Void>> extends FutureMono {

final F future;

ImmediateFutureMono(F future) {
this.future = Objects.requireNonNull(future, "future");
}

@Override
public void subscribe(final CoreSubscriber<? super Void> s) {
doSubscribe(s, future);
}
}

final static class DeferredFutureMono<F extends Future<Void>> extends FutureMono {

final Supplier<F> deferredFuture;

DeferredFutureMono(Supplier<F> deferredFuture) {
this.deferredFuture =
Objects.requireNonNull(deferredFuture, "deferredFuture");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
F f;
try {
f = deferredFuture.get();
}
catch (Throwable t) {
Operators.error(s, t);
return;
}

if (f == null) {
Operators.error(s,
Operators.onOperatorError(new NullPointerException(
"Deferred supplied null"), s.currentContext()));
return;
}

doSubscribe(s, f);
}
}

static <F extends Future<Void>> void doSubscribe(CoreSubscriber<? super Void> s, F future) {
if (future.isDone()) {
if (future.isSuccess()) {
Operators.complete(s);
}
else {
Operators.error(s, FutureSubscription.wrapError(future.cause()));
}
return;
}

FutureSubscription<F> fs = new FutureSubscription<>(future, s);
// propagate subscription before adding listener to avoid any race between finishing future and onSubscribe
// is called
s.onSubscribe(fs);

// check if subscription was not cancelled immediately.
if (fs.cancelled) {
// if so do nothing anymore
return;
}

// add listener to the future to propagate on complete when future is done
// addListener likely to be thread safe method
future.addListener(fs);

// check once again if is cancelled to see if we need to removeListener in case addListener racing with
// subscription.cancel (which should remove listener)
if (fs.cancelled) {
// future.removeListener(fs); TODO
}
}

final static class FutureSubscription<F extends Future<Void>>
implements FutureListener<Void>, Subscription, Supplier<Context> {

final CoreSubscriber<? super Void> s;

final F future;

boolean cancelled;

FutureSubscription(F future, CoreSubscriber<? super Void> s) {
this.s = s;
this.future = future;
}

@Override
public void request(long n) {
//noop
}

@Override
public Context get() {
return s.currentContext();
}

@Override
public void cancel() {
// cancel is not thread safe since we assume that removeListener is thread-safe. That said if we have
// concurrent addListener and removeListener and if addListener is after removeListener, the other Thread
// after execution addListener should see changes happened before removeListener. Thus, it should see
// cancelled flag set to true and should cleanup added handler
this.cancelled = true;
// future.removeListener(this); TODO
}

@Override
public void operationComplete(Future<? extends Void> future) {
if (!future.isSuccess()) {
s.onError(wrapError(future.cause()));
}
else {
s.onComplete();
}
}

private static Throwable wrapError(Throwable error) {
if (error instanceof ClosedChannelException) {
return new AbortedException(error);
}
else {
return error;
}
}
}
}
Expand Up @@ -41,6 +41,7 @@
import reactor.netty5.ChannelOperationsId;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.FutureMono;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;
import reactor.netty5.NettyPipeline;
Expand Down Expand Up @@ -287,8 +288,8 @@ public NettyOutbound send(Publisher<? extends Buffer> dataStream, Predicate<Buff
return then(Mono.error(AbortedException.beforeSend()));
}
if (dataStream instanceof Mono<?> mono) {
return then(mono.flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage()))
.doOnDiscard(Buffer.class, Buffer::close));
return then(mono.flatMap(m -> FutureMono.from(channel().writeAndFlush(m)))
.doOnDiscard(Buffer.class, Buffer::close));
}
return then(MonoSendMany.bufferSource(dataStream, channel(), predicate));
}
Expand All @@ -300,8 +301,8 @@ public NettyOutbound sendObject(Publisher<?> dataStream, Predicate<Object> predi
return then(Mono.error(AbortedException.beforeSend()));
}
if (dataStream instanceof Mono<?> mono) {
return then(mono.flatMap(m -> Mono.fromCompletionStage(channel().writeAndFlush(m).asStage()))
.doOnDiscard(Resource.class, Resource::dispose));
return then(mono.flatMap(m -> FutureMono.from(channel().writeAndFlush(m)))
.doOnDiscard(Resource.class, Resource::dispose));
}
return then(MonoSendMany.objectSource(dataStream, channel(), predicate));
}
Expand All @@ -312,8 +313,8 @@ public NettyOutbound sendObject(Object message) {
ReactorNetty.safeRelease(message);
return then(Mono.error(AbortedException.beforeSend()));
}
return then(Mono.fromCompletionStage(() -> connection.channel()
.writeAndFlush(message).asStage()),
return then(FutureMono.deferFuture(() -> connection.channel()
.writeAndFlush(message)),
() -> ReactorNetty.safeRelease(message));
}

Expand All @@ -327,8 +328,8 @@ public <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput,

return then(Mono.using(
sourceInput,
s -> Mono.fromCompletionStage(connection.channel()
.writeAndFlush(mappedInput.apply(this, s)).asStage()),
s -> FutureMono.from(connection.channel()
.writeAndFlush(mappedInput.apply(this, s))),
sourceCleanup)
);
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import io.netty5.util.concurrent.FastThreadLocalThread;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.NonBlocking;
import reactor.netty5.FutureMono;

/**
* An adapted global eventLoop handler.
Expand Down Expand Up @@ -97,28 +98,28 @@ public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
Mono<?> cnsrvlMono = Mono.empty();
if (running.compareAndSet(true, false)) {
if (clientLoopsGroup != null) {
clMono = Mono.fromCompletionStage(clientLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
clMono = FutureMono.from(clientLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (serverSelectLoopsGroup != null) {
sslMono = Mono.fromCompletionStage(serverSelectLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
sslMono = FutureMono.from(serverSelectLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (serverLoopsGroup != null) {
slMono = Mono.fromCompletionStage(serverLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
slMono = FutureMono.from(serverLoopsGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (cacheNativeClientGroup != null) {
cnclMono = Mono.fromCompletionStage(cacheNativeClientGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
cnclMono = FutureMono.from(cacheNativeClientGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (cacheNativeSelectGroup != null) {
cnslMono = Mono.fromCompletionStage(cacheNativeSelectGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
cnslMono = FutureMono.from(cacheNativeSelectGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
if (cacheNativeServerGroup != null) {
cnsrvlMono = Mono.fromCompletionStage(cacheNativeServerGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS).asStage());
cnsrvlMono = FutureMono.from(cacheNativeServerGroup.shutdownGracefully(
quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
}
}

Expand Down
Expand Up @@ -40,6 +40,7 @@
import reactor.core.publisher.Sinks;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.FutureMono;
import reactor.netty5.NettyPipeline;
import reactor.netty5.channel.ChannelOperations;
import reactor.netty5.transport.TransportConfig;
Expand Down Expand Up @@ -565,7 +566,7 @@ public void onSubscribe(Subscription s) {
if (!pooledConnection.channel.isActive()) {
return Mono.empty();
}
return Mono.fromCompletionStage(pooledConnection.channel.close().asStage());
return FutureMono.from(pooledConnection.channel.close());
};
}
}
Expand Up @@ -25,6 +25,7 @@
import reactor.core.publisher.Mono;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.FutureMono;
import reactor.netty5.channel.ChannelOperations;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -66,12 +67,12 @@ public Mono<Void> join(final InetAddress multicastAddress, @Nullable NetworkInte
future = datagramChannel.joinGroup(multicastAddress);
}

return Mono.fromCompletionStage(future.asStage())
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
return FutureMono.from(future)
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
}

/**
Expand All @@ -98,12 +99,12 @@ public Mono<Void> leave(final InetAddress multicastAddress, @Nullable NetworkInt
future = datagramChannel.leaveGroup(multicastAddress);
}

return Mono.fromCompletionStage(future.asStage())
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
return FutureMono.from(future)
.doOnSuccess(v -> {
if (log.isInfoEnabled()) {
log.info(format(datagramChannel, "JOIN {}"), multicastAddress);
}
});
}

static final Logger log = Loggers.getLogger(UdpOperations.class);
Expand Down

0 comments on commit 1aca50f

Please sign in to comment.