Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure HttpServer#idleTimeout configuration is applied for both HTTP/1.1 and HTTP/2 #2414

Merged
merged 1 commit into from Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -500,6 +500,7 @@ static void configureH2Pipeline(ChannelPipeline p,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
@Nullable Duration idleTimeout,
ConnectionObserver listener,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
@Nullable ChannelMetricsRecorder metricsRecorder,
Expand All @@ -525,6 +526,8 @@ static void configureH2Pipeline(ChannelPipeline p,
cookieEncoder, formDecoderProvider, forwardedHeaderHandler, listener, mapHandle,
metricsRecorder, minCompressionSize, opsFactory, uriTagValue)));

IdleTimeoutHandler.addIdleTimeoutHandler(p, idleTimeout);

if (metricsRecorder != null) {
if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) {
// For sake of performance, we can replace the ChannelMetricsHandler because the MicrometerHttpServerMetricsRecorder
Expand Down Expand Up @@ -975,7 +978,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {

if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder,
formDecoderProvider, forwardedHeaderHandler, http2Settings, listener, mapHandle,
formDecoderProvider, forwardedHeaderHandler, http2Settings, idleTimeout, listener, mapHandle,
metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());
return;
}
Expand Down Expand Up @@ -1090,6 +1093,7 @@ else if ((protocols & h2) == h2) {
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
idleTimeout,
observer,
mapHandle,
metricsRecorder,
Expand Down Expand Up @@ -1151,6 +1155,7 @@ else if ((protocols & h2c) == h2c) {
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
idleTimeout,
observer,
mapHandle,
metricsRecorder,
Expand Down
Expand Up @@ -19,39 +19,31 @@
import java.time.Duration;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.DecoderResultProvider;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ReferenceCountUtil;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyPipeline;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

Expand Down Expand Up @@ -482,56 +474,4 @@ static boolean isMultipart(HttpResponse response) {
MULTIPART_PREFIX.length());
}

static final class IdleTimeoutHandler extends IdleStateHandler {

final long idleTimeout;

IdleTimeoutHandler(long idleTimeout) {
super(idleTimeout, 0, 0, TimeUnit.MILLISECONDS);
this.idleTimeout = idleTimeout;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.READER_IDLE) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(),
"Connection was idle for [{}ms], as per configuration the connection will be closed."),
idleTimeout);
}
// FutureReturnValueIgnored is deliberate
ctx.close();
}
ctx.fireUserEventTriggered(evt);
}

static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) {
if (idleTimeout != null) {
String baseName = null;
if (pipeline.get(NettyPipeline.HttpCodec) != null) {
baseName = NettyPipeline.HttpCodec;
}
else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) != null) {
baseName = NettyPipeline.H2CUpgradeHandler;
}
else {
ChannelHandler httpServerCodec = pipeline.get(HttpServerCodec.class);
if (httpServerCodec != null) {
baseName = pipeline.context(httpServerCodec).name();
}
}

pipeline.addBefore(baseName,
NettyPipeline.IdleTimeoutHandler,
new IdleTimeoutHandler(idleTimeout.toMillis()));
}
}

static void removeIdleTimeoutHandler(ChannelPipeline pipeline) {
if (pipeline.get(NettyPipeline.IdleTimeoutHandler) != null) {
pipeline.remove(NettyPipeline.IdleTimeoutHandler);
}
}
}
}
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.http.server;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import reactor.netty.NettyPipeline;
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static reactor.netty.ReactorNetty.format;

final class IdleTimeoutHandler extends IdleStateHandler {

IdleTimeoutHandler(long idleTimeout) {
super(idleTimeout, 0, 0, TimeUnit.MILLISECONDS);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.READER_IDLE) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(),
"Connection was idle for [{}ms], as per configuration the connection will be closed."),
getReaderIdleTimeInMillis());
}
// FutureReturnValueIgnored is deliberate
ctx.close();
}
ctx.fireUserEventTriggered(evt);
}

static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) {
if (idleTimeout != null) {
String baseName = null;
if (pipeline.get(NettyPipeline.HttpCodec) != null) {
baseName = NettyPipeline.HttpCodec;
}
else {
ChannelHandler httpServerUpgradeHandler = pipeline.get(HttpServerUpgradeHandler.class);
if (httpServerUpgradeHandler != null) {
baseName = pipeline.context(httpServerUpgradeHandler).name();
}
else {
ChannelHandler httpServerCodec = pipeline.get(HttpServerCodec.class);
if (httpServerCodec != null) {
baseName = pipeline.context(httpServerCodec).name();
}
}
}

pipeline.addAfter(baseName,
NettyPipeline.IdleTimeoutHandler,
new IdleTimeoutHandler(idleTimeout.toMillis()));
}
}

static void removeIdleTimeoutHandler(ChannelPipeline pipeline) {
if (pipeline.get(NettyPipeline.IdleTimeoutHandler) != null) {
pipeline.remove(NettyPipeline.IdleTimeoutHandler);
}
}
}
Expand Up @@ -22,8 +22,12 @@
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.read.ListAppender;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.timeout.ReadTimeoutHandler;
Expand Down Expand Up @@ -56,6 +60,8 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -593,4 +599,53 @@ void testIdleTimeoutAddedCorrectly(HttpServer server, HttpClient client) {
assertThat(listAppender.list)
.noneMatch(event -> event.getLevel() == Level.WARN);
}

@ParameterizedCompatibleCombinationsTest
void testIdleTimeout(HttpServer server, HttpClient client) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
disposableServer =
server.idleTimeout(Duration.ofMillis(500))
.route(routes ->
routes.post("/echo", (req, res) ->
res.withConnection(conn -> {
Channel channel = conn.channel() instanceof Http2StreamChannel ?
conn.channel().parent() : conn.channel();
channel.closeFuture().addListener(f -> latch.countDown());
})
.send(req.receive().retain())))
.bindNow();

CountDownLatch goAwayReceived = new CountDownLatch(1);
client.doOnResponse((res, conn) -> {
if (!(conn.channel() instanceof Http2StreamChannel)) {
goAwayReceived.countDown();
return;
}

Http2FrameCodec http2FrameCodec = conn.channel().parent().pipeline().get(Http2FrameCodec.class);
Http2Connection.Listener goAwayFrameListener = Mockito.mock(Http2Connection.Listener.class);
Mockito.doAnswer(invocation -> {
goAwayReceived.countDown();
return null;
})
.when(goAwayFrameListener)
.onGoAwayReceived(Mockito.anyInt(), Mockito.anyLong(), Mockito.any());
http2FrameCodec.connection().addListener(goAwayFrameListener);
})
.port(disposableServer.port())
.post()
.uri("/echo")
.send(ByteBufFlux.fromString(Mono.just("Hello world!")))
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.expectNext("Hello world!")
.expectComplete()
.verify(Duration.ofSeconds(5));

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

assertThat(goAwayReceived.await(10, TimeUnit.SECONDS)).isTrue();
}
}