Skip to content

Commit

Permalink
Ensure IdleTimeoutHandler receives always channelRead events (#2660)
Browse files Browse the repository at this point in the history
- When the server is configured with HTTP/1.1 and H2C, ensure Http2MultiplexHandler
is at the end of the pipeline because this handler doesn't forward all channelRead events.
- In case the server is configured with HTTP/1.1 and H2 and the negotiated protocol is HTTP/1.1,
when channelActive event happens, HttpTrafficHandler is still not in the pipeline.
In this use case IdleTimeoutHandler is added at the point when HTTP/1.1 is negotiated

Fixes #2649
  • Loading branch information
violetagg committed Jan 20, 2023
1 parent a86a270 commit 21bd6ef
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -768,8 +768,8 @@ public void handlerAdded(ChannelHandlerContext ctx) {
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec);
}

pipeline.addAfter(ctx.pipeline().context(upgrader.http2FrameCodec).name(),
NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader));
// Add this handler at the end of the pipeline as it does not forward all channelRead events
pipeline.addLast(NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader));

pipeline.remove(this);

Expand Down Expand Up @@ -1004,6 +1004,11 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder,
decoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener,
mapHandle, maxKeepAliveRequests, metricsRecorder, minCompressionSize, uriTagValue);

// When the server is configured with HTTP/1.1 and H2 and HTTP/1.1 is negotiated,
// when channelActive event happens, this HttpTrafficHandler is still not in the pipeline,
// and will not be able to add IdleTimeoutHandler. So in this use case add IdleTimeoutHandler here.
IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout);
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
}

static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) {
if (idleTimeout != null) {
if (pipeline.get(NettyPipeline.IdleTimeoutHandler) == null && idleTimeout != null) {
String baseName = null;
if (pipeline.get(NettyPipeline.HttpCodec) != null) {
baseName = NettyPipeline.HttpCodec;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,10 +23,21 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2SettingsAckFrame;
import io.netty.handler.codec.http2.Http2SettingsFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.timeout.ReadTimeoutHandler;
Expand Down Expand Up @@ -71,6 +82,7 @@
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty.ConnectionObserver.State.CONNECTED;

/**
* Test a combination of {@link HttpServer} + {@link HttpProtocol}
Expand Down Expand Up @@ -651,9 +663,33 @@ void testIdleTimeoutAddedCorrectly(HttpServer server, HttpClient client) {

@ParameterizedCompatibleCombinationsTest
void testIdleTimeout(HttpServer server, HttpClient client) throws Exception {
HttpProtocol[] serverProtocols = server.configuration().protocols();
HttpProtocol[] clientProtocols = client.configuration().protocols();

CountDownLatch latch = new CountDownLatch(1);
IdleTimeoutTestChannelInboundHandler customHandler = new IdleTimeoutTestChannelInboundHandler();
disposableServer =
server.idleTimeout(Duration.ofMillis(500))
.doOnChannelInit((obs, ch, addr) -> {
if (((serverProtocols.length == 1 && serverProtocols[0] != HttpProtocol.HTTP11) ||
(clientProtocols.length == 1 && clientProtocols[0] != HttpProtocol.HTTP11)) &&
ch.pipeline().get(NettyPipeline.IdleTimeoutHandler) != null) {
ch.pipeline().addAfter(NettyPipeline.IdleTimeoutHandler, "testIdleTimeout", customHandler);
}
else if (serverProtocols.length == 2 && serverProtocols[1] == HttpProtocol.H2) {
ch.pipeline().addBefore(NettyPipeline.ReactiveBridge, "testIdleTimeout1",
new IdleTimeoutTest1ChannelInboundHandler(customHandler));
}
})
.childObserve((conn, state) -> {
Channel channel = conn.channel();
if (state == CONNECTED &&
!(channel instanceof Http2StreamChannel) &&
channel.pipeline().get(NettyPipeline.IdleTimeoutHandler) != null &&
channel.pipeline().get("testIdleTimeout") == null) {
channel.pipeline().addAfter(NettyPipeline.IdleTimeoutHandler, "testIdleTimeout", customHandler);
}
})
.route(routes ->
routes.post("/echo", (req, res) ->
res.withConnection(conn -> {
Expand Down Expand Up @@ -696,6 +732,33 @@ void testIdleTimeout(HttpServer server, HttpClient client) throws Exception {
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

assertThat(goAwayReceived.await(10, TimeUnit.SECONDS)).isTrue();

assertThat(customHandler.latch.await(10, TimeUnit.SECONDS)).isTrue();

if ((serverProtocols.length == 1 && serverProtocols[0] == HttpProtocol.HTTP11) ||
(clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11)) {
assertThat(customHandler.list).hasSize(3);
assertThat(customHandler.list.get(0)).isInstanceOf(HttpRequest.class);
assertThat(customHandler.list.get(1)).isInstanceOf(HttpContent.class);
assertThat(customHandler.list.get(2)).isInstanceOf(LastHttpContent.class);
}
else if (serverProtocols.length == 1 || clientProtocols.length == 1 ||
(serverProtocols.length == 2 && clientProtocols.length == 2 &&
serverProtocols[1] == HttpProtocol.H2 && clientProtocols[1] == HttpProtocol.H2)) {
assertThat(customHandler.list).hasSize(5);
assertThat(customHandler.list.get(0)).isInstanceOf(Http2SettingsFrame.class);
assertThat(customHandler.list.get(1)).isInstanceOf(Http2SettingsAckFrame.class);
assertThat(customHandler.list.get(2)).isInstanceOf(Http2HeadersFrame.class);
assertThat(customHandler.list.get(3)).isInstanceOf(Http2DataFrame.class);
assertThat(customHandler.list.get(4)).isInstanceOf(Http2DataFrame.class);
}
else if (clientProtocols.length == 2 && clientProtocols[1] == HttpProtocol.H2C) {
assertThat(customHandler.list).hasSize(4);
assertThat(customHandler.list.get(0)).isInstanceOf(Http2HeadersFrame.class);
assertThat(customHandler.list.get(1)).isInstanceOf(Http2DataFrame.class);
assertThat(customHandler.list.get(2)).isInstanceOf(Http2SettingsFrame.class);
assertThat(customHandler.list.get(3)).isInstanceOf(Http2SettingsAckFrame.class);
}
}

static final class AccessLogAppender extends AppenderBase<ILoggingEvent> {
Expand All @@ -709,4 +772,43 @@ protected void append(ILoggingEvent eventObject) {
latch.countDown();
}
}

static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter {

final CountDownLatch latch = new CountDownLatch(1);
final List<Object> list = new ArrayList<>();

@Override
public void channelInactive(ChannelHandlerContext ctx) {
latch.countDown();
ctx.fireChannelInactive();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
list.add(msg);
ctx.fireChannelRead(msg);
}
}

static final class IdleTimeoutTest1ChannelInboundHandler extends ChannelInboundHandlerAdapter {

final IdleTimeoutTestChannelInboundHandler channelHandler;

IdleTimeoutTest1ChannelInboundHandler(IdleTimeoutTestChannelInboundHandler channelHandler) {
this.channelHandler = channelHandler;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ChannelPipeline pipeline = ctx.channel().pipeline();
if (evt instanceof SslHandshakeCompletionEvent &&
pipeline.get(NettyPipeline.IdleTimeoutHandler) != null &&
pipeline.get("testIdleTimeout") == null) {
pipeline.remove(this);
pipeline.addAfter(NettyPipeline.IdleTimeoutHandler, "testIdleTimeout", channelHandler);
}
ctx.fireUserEventTriggered(evt);
}
}
}

0 comments on commit 21bd6ef

Please sign in to comment.