Skip to content

Commit

Permalink
Merge #2414 into 2.0.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Aug 3, 2022
2 parents 11f4dcf + f87789b commit 76a2cea
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 59 deletions.
Expand Up @@ -512,6 +512,7 @@ static void configureH2Pipeline(ChannelPipeline p,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
@Nullable Duration idleTimeout,
ConnectionObserver listener,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
@Nullable ChannelMetricsRecorder metricsRecorder,
Expand All @@ -537,6 +538,8 @@ static void configureH2Pipeline(ChannelPipeline p,
cookieEncoder, formDecoderProvider, forwardedHeaderHandler, listener, mapHandle,
metricsRecorder, minCompressionSize, opsFactory, uriTagValue)));

IdleTimeoutHandler.addIdleTimeoutHandler(p, idleTimeout);

if (metricsRecorder != null) {
if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) {
// For sake of performance, we can replace the ChannelMetricsHandler because the MicrometerHttpServerMetricsRecorder
Expand Down Expand Up @@ -1015,7 +1018,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {

if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder,
formDecoderProvider, forwardedHeaderHandler, http2Settings, listener, mapHandle,
formDecoderProvider, forwardedHeaderHandler, http2Settings, idleTimeout, listener, mapHandle,
metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());
return;
}
Expand Down Expand Up @@ -1128,6 +1131,7 @@ else if ((protocols & h2) == h2) {
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
idleTimeout,
observer,
mapHandle,
metricsRecorder,
Expand Down Expand Up @@ -1189,6 +1193,7 @@ else if ((protocols & h2c) == h2c) {
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
idleTimeout,
observer,
mapHandle,
metricsRecorder,
Expand Down
Expand Up @@ -19,40 +19,32 @@
import java.time.Duration;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;

import io.netty5.channel.Channel;
import io.netty5.channel.ChannelFutureListeners;
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelPipeline;
import io.netty5.handler.codec.DecoderResult;
import io.netty5.handler.codec.DecoderResultProvider;
import io.netty5.handler.codec.http.HttpHeaderNames;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.HttpResponseStatus;
import io.netty5.handler.codec.http.HttpServerCodec;
import io.netty5.handler.codec.http.HttpStatusClass;
import io.netty5.handler.codec.http.HttpVersion;
import io.netty5.handler.codec.http.LastHttpContent;
import io.netty5.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty5.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty5.handler.ssl.SslHandler;
import io.netty5.handler.timeout.IdleState;
import io.netty5.handler.timeout.IdleStateEvent;
import io.netty5.handler.timeout.IdleStateHandler;
import io.netty5.util.Resource;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureContextListener;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.NettyPipeline;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

Expand Down Expand Up @@ -485,54 +477,4 @@ static boolean isMultipart(HttpResponse response) {
MULTIPART_PREFIX.length());
}

static final class IdleTimeoutHandler extends IdleStateHandler {

final long idleTimeout;

IdleTimeoutHandler(long idleTimeout) {
super(idleTimeout, 0, 0, TimeUnit.MILLISECONDS);
this.idleTimeout = idleTimeout;
}

@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.READER_IDLE) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(),
"Connection was idle for [{}ms], as per configuration the connection will be closed."),
idleTimeout);
}
ctx.close();
}
ctx.fireChannelInboundEvent(evt);
}

static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) {
if (idleTimeout != null) {
String baseName = null;
if (pipeline.get(NettyPipeline.HttpCodec) != null) {
baseName = NettyPipeline.HttpCodec;
}
else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) != null) {
baseName = NettyPipeline.H2CUpgradeHandler;
}
else {
ChannelHandler httpServerCodec = pipeline.get(HttpServerCodec.class);
if (httpServerCodec != null) {
baseName = pipeline.context(httpServerCodec).name();
}
}

pipeline.addBefore(baseName,
NettyPipeline.IdleTimeoutHandler,
new IdleTimeoutHandler(idleTimeout.toMillis()));
}
}

static void removeIdleTimeoutHandler(ChannelPipeline pipeline) {
if (pipeline.get(NettyPipeline.IdleTimeoutHandler) != null) {
pipeline.remove(NettyPipeline.IdleTimeoutHandler);
}
}
}
}
@@ -0,0 +1,83 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5.http.server;

import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelPipeline;
import io.netty5.handler.codec.http.HttpServerCodec;
import io.netty5.handler.codec.http.HttpServerUpgradeHandler;
import io.netty5.handler.timeout.IdleState;
import io.netty5.handler.timeout.IdleStateEvent;
import io.netty5.handler.timeout.IdleStateHandler;
import reactor.netty5.NettyPipeline;
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static reactor.netty5.ReactorNetty.format;

final class IdleTimeoutHandler extends IdleStateHandler {

IdleTimeoutHandler(long idleTimeout) {
super(idleTimeout, 0, 0, TimeUnit.MILLISECONDS);
}

@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.READER_IDLE) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(),
"Connection was idle for [{}ms], as per configuration the connection will be closed."),
getReaderIdleTimeInMillis());
}
ctx.close();
}
ctx.fireChannelInboundEvent(evt);
}

static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) {
if (idleTimeout != null) {
String baseName = null;
if (pipeline.get(NettyPipeline.HttpCodec) != null) {
baseName = NettyPipeline.HttpCodec;
}
else {
ChannelHandler httpServerUpgradeHandler = pipeline.get(HttpServerUpgradeHandler.class);
if (httpServerUpgradeHandler != null) {
baseName = pipeline.context(httpServerUpgradeHandler).name();
}
else {
ChannelHandler httpServerCodec = pipeline.get(HttpServerCodec.class);
if (httpServerCodec != null) {
baseName = pipeline.context(httpServerCodec).name();
}
}
}

pipeline.addAfter(baseName,
NettyPipeline.IdleTimeoutHandler,
new IdleTimeoutHandler(idleTimeout.toMillis()));
}
}

static void removeIdleTimeoutHandler(ChannelPipeline pipeline) {
if (pipeline.get(NettyPipeline.IdleTimeoutHandler) != null) {
pipeline.remove(NettyPipeline.IdleTimeoutHandler);
}
}
}
Expand Up @@ -22,8 +22,12 @@
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.read.ListAppender;
import io.netty5.buffer.api.Buffer;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandler;
import io.netty5.handler.codec.http.HttpHeaderNames;
import io.netty5.handler.codec.http2.Http2Connection;
import io.netty5.handler.codec.http2.Http2FrameCodec;
import io.netty5.handler.codec.http2.Http2StreamChannel;
import io.netty5.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty5.handler.ssl.util.SelfSignedCertificate;
import io.netty5.handler.timeout.ReadTimeoutHandler;
Expand Down Expand Up @@ -56,6 +60,8 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -593,4 +599,53 @@ void testIdleTimeoutAddedCorrectly(HttpServer server, HttpClient client) {
assertThat(listAppender.list)
.noneMatch(event -> event.getLevel() == Level.WARN);
}

@ParameterizedCompatibleCombinationsTest
void testIdleTimeout(HttpServer server, HttpClient client) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
disposableServer =
server.idleTimeout(Duration.ofMillis(500))
.route(routes ->
routes.post("/echo", (req, res) ->
res.withConnection(conn -> {
Channel channel = conn.channel() instanceof Http2StreamChannel ?
conn.channel().parent() : conn.channel();
channel.closeFuture().addListener(f -> latch.countDown());
})
.send(req.receive().transferOwnership())))
.bindNow();

CountDownLatch goAwayReceived = new CountDownLatch(1);
client.doOnResponse((res, conn) -> {
if (!(conn.channel() instanceof Http2StreamChannel)) {
goAwayReceived.countDown();
return;
}

Http2FrameCodec http2FrameCodec = conn.channel().parent().pipeline().get(Http2FrameCodec.class);
Http2Connection.Listener goAwayFrameListener = Mockito.mock(Http2Connection.Listener.class);
Mockito.doAnswer(invocation -> {
goAwayReceived.countDown();
return null;
})
.when(goAwayFrameListener)
.onGoAwayReceived(Mockito.anyInt(), Mockito.anyLong(), Mockito.any());
http2FrameCodec.connection().addListener(goAwayFrameListener);
})
.port(disposableServer.port())
.post()
.uri("/echo")
.send(BufferFlux.fromString(Mono.just("Hello world!")))
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.expectNext("Hello world!")
.expectComplete()
.verify(Duration.ofSeconds(5));

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

assertThat(goAwayReceived.await(10, TimeUnit.SECONDS)).isTrue();
}
}

0 comments on commit 76a2cea

Please sign in to comment.