Skip to content

Commit

Permalink
Ensure IdleTimeoutHandler receives always channelRead events
Browse files Browse the repository at this point in the history
- When the server is configured with HTTP/1.1 and H2C ensure Http2MultiplexHandler
is at the end of the pipeline because this handler doesn't forward all channelRead events.
- When the server is configured with HTTP/1.1 and H2 and the client sends HTTP/1.1,
when channelActive event happens, HttpTrafficHandler is still not in the pipeline.
In this use case add IdleTimeoutHandler when HttpTrafficHandler is added to the pipeline.

Fixes #2649
  • Loading branch information
violetagg committed Jan 19, 2023
1 parent 6b0b531 commit d13738f
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -768,8 +768,8 @@ public void handlerAdded(ChannelHandlerContext ctx) {
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec);
}

pipeline.addAfter(ctx.pipeline().context(upgrader.http2FrameCodec).name(),
NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader));
// Add this handler at the end of the pipeline as it does not forward all channelRead events
pipeline.addLast(NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader));

pipeline.remove(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ final class HttpTrafficHandler extends ChannelDuplexHandler
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
this.ctx = ctx;

// When the server is configured with HTTP/1.1 and H2 and the client is HTTP/1.1,
// when channelActive event happens, this handler is still not in the pipeline.
// In this use case add IdleTimeoutHandler when this handler is added to the pipeline.
IdleTimeoutHandler.addIdleTimeoutHandler(ctx.pipeline(), idleTimeout);

if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "New http connection, requesting read"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
}

static void addIdleTimeoutHandler(ChannelPipeline pipeline, @Nullable Duration idleTimeout) {
if (idleTimeout != null) {
if (pipeline.get(NettyPipeline.IdleTimeoutHandler) == null && idleTimeout != null) {
String baseName = null;
if (pipeline.get(NettyPipeline.HttpCodec) != null) {
baseName = NettyPipeline.HttpCodec;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,10 +23,21 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2SettingsAckFrame;
import io.netty.handler.codec.http2.Http2SettingsFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.timeout.ReadTimeoutHandler;
Expand Down Expand Up @@ -71,6 +82,7 @@
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty.ConnectionObserver.State.CONNECTED;

/**
* Test a combination of {@link HttpServer} + {@link HttpProtocol}
Expand Down Expand Up @@ -651,9 +663,33 @@ void testIdleTimeoutAddedCorrectly(HttpServer server, HttpClient client) {

@ParameterizedCompatibleCombinationsTest
void testIdleTimeout(HttpServer server, HttpClient client) throws Exception {
HttpProtocol[] serverProtocols = server.configuration().protocols();
HttpProtocol[] clientProtocols = client.configuration().protocols();

CountDownLatch latch = new CountDownLatch(1);
IdleTimeoutTestChannelInboundHandler customHandler = new IdleTimeoutTestChannelInboundHandler();
disposableServer =
server.idleTimeout(Duration.ofMillis(500))
.doOnChannelInit((obs, ch, addr) -> {
if (((serverProtocols.length == 1 && serverProtocols[0] != HttpProtocol.HTTP11) ||
(clientProtocols.length == 1 && clientProtocols[0] != HttpProtocol.HTTP11)) &&
ch.pipeline().get(NettyPipeline.IdleTimeoutHandler) != null) {
ch.pipeline().addAfter(NettyPipeline.IdleTimeoutHandler, "testIdleTimeout", customHandler);
}
else if (serverProtocols.length == 2 && serverProtocols[1] == HttpProtocol.H2) {
ch.pipeline().addBefore(NettyPipeline.ReactiveBridge, "testIdleTimeout1",
new IdleTimeoutTest1ChannelInboundHandler(customHandler));
}
})
.childObserve((conn, state) -> {
Channel channel = conn.channel();
if (state == CONNECTED &&
!(channel instanceof Http2StreamChannel) &&
channel.pipeline().get(NettyPipeline.IdleTimeoutHandler) != null &&
channel.pipeline().get("testIdleTimeout") == null) {
channel.pipeline().addAfter(NettyPipeline.IdleTimeoutHandler, "testIdleTimeout", customHandler);
}
})
.route(routes ->
routes.post("/echo", (req, res) ->
res.withConnection(conn -> {
Expand Down Expand Up @@ -696,6 +732,33 @@ void testIdleTimeout(HttpServer server, HttpClient client) throws Exception {
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

assertThat(goAwayReceived.await(10, TimeUnit.SECONDS)).isTrue();

assertThat(customHandler.latch.await(10, TimeUnit.SECONDS)).isTrue();

if ((serverProtocols.length == 1 && serverProtocols[0] == HttpProtocol.HTTP11) ||
(clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11)) {
assertThat(customHandler.list).hasSize(3);
assertThat(customHandler.list.get(0)).isInstanceOf(HttpRequest.class);
assertThat(customHandler.list.get(1)).isInstanceOf(HttpContent.class);
assertThat(customHandler.list.get(2)).isInstanceOf(LastHttpContent.class);
}
else if (serverProtocols.length == 1 || clientProtocols.length == 1 ||
(serverProtocols.length == 2 && clientProtocols.length == 2 &&
serverProtocols[1] == HttpProtocol.H2 && clientProtocols[1] == HttpProtocol.H2)) {
assertThat(customHandler.list).hasSize(5);
assertThat(customHandler.list.get(0)).isInstanceOf(Http2SettingsFrame.class);
assertThat(customHandler.list.get(1)).isInstanceOf(Http2SettingsAckFrame.class);
assertThat(customHandler.list.get(2)).isInstanceOf(Http2HeadersFrame.class);
assertThat(customHandler.list.get(3)).isInstanceOf(Http2DataFrame.class);
assertThat(customHandler.list.get(4)).isInstanceOf(Http2DataFrame.class);
}
else if (clientProtocols.length == 2 && clientProtocols[1] == HttpProtocol.H2C) {
assertThat(customHandler.list).hasSize(4);
assertThat(customHandler.list.get(0)).isInstanceOf(Http2HeadersFrame.class);
assertThat(customHandler.list.get(1)).isInstanceOf(Http2DataFrame.class);
assertThat(customHandler.list.get(2)).isInstanceOf(Http2SettingsFrame.class);
assertThat(customHandler.list.get(3)).isInstanceOf(Http2SettingsAckFrame.class);
}
}

static final class AccessLogAppender extends AppenderBase<ILoggingEvent> {
Expand All @@ -709,4 +772,43 @@ protected void append(ILoggingEvent eventObject) {
latch.countDown();
}
}

static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter {

final CountDownLatch latch = new CountDownLatch(1);
final List<Object> list = new ArrayList<>();

@Override
public void channelInactive(ChannelHandlerContext ctx) {
latch.countDown();
ctx.fireChannelInactive();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
list.add(msg);
ctx.fireChannelRead(msg);
}
}

static final class IdleTimeoutTest1ChannelInboundHandler extends ChannelInboundHandlerAdapter {

final IdleTimeoutTestChannelInboundHandler channelHandler;

IdleTimeoutTest1ChannelInboundHandler(IdleTimeoutTestChannelInboundHandler channelHandler) {
this.channelHandler = channelHandler;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ChannelPipeline pipeline = ctx.channel().pipeline();
if (evt instanceof SslHandshakeCompletionEvent &&
pipeline.get(NettyPipeline.IdleTimeoutHandler) != null &&
pipeline.get("testIdleTimeout") == null) {
pipeline.remove(this);
pipeline.addAfter(NettyPipeline.IdleTimeoutHandler, "testIdleTimeout", channelHandler);
}
ctx.fireUserEventTriggered(evt);
}
}
}

0 comments on commit d13738f

Please sign in to comment.