Skip to content

Commit

Permalink
Ensure Http2FrameCodec.Encoder is closed when upgrade is rejected
Browse files Browse the repository at this point in the history
Ensure Http2FrameCodec.Encoder is closed when Exception happened
before decoding the server response
  • Loading branch information
violetagg committed Sep 28, 2022
1 parent 5d54b8f commit ebb9bb1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Expand Up @@ -37,9 +37,12 @@
import io.netty5.channel.ChannelInitializer;
import io.netty5.channel.ChannelOption;
import io.netty5.channel.ChannelPipeline;
import io.netty5.handler.codec.http.DefaultHttpContent;
import io.netty5.handler.codec.http.HttpClientCodec;
import io.netty5.handler.codec.http.HttpClientUpgradeHandler;
import io.netty5.handler.codec.http.HttpContentDecompressor;
import io.netty5.handler.codec.http.HttpObject;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.headers.HttpHeaders;
import io.netty5.handler.codec.http.HttpMethod;
import io.netty5.handler.codec.http2.Http2ClientUpgradeCodec;
Expand Down Expand Up @@ -80,6 +83,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import static io.netty5.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static reactor.netty5.ReactorNetty.format;
import static reactor.netty5.http.client.Http2ConnectionProvider.OWNER;

Expand Down Expand Up @@ -606,7 +610,8 @@ static void configureHttp11OrH2CleartextPipeline(
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue));

HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
ReactorNettyHttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler(
http2FrameCodec, httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());

p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
Expand Down Expand Up @@ -1001,6 +1006,39 @@ public void onUncaughtException(Connection connection, Throwable error) {
}
}

static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler<DefaultHttpContent> {

final Http2FrameCodec http2FrameCodec;

boolean decoded;

ReactorNettyHttpClientUpgradeHandler(
Http2FrameCodec http2FrameCodec,
SourceCodec sourceCodec,
UpgradeCodec upgradeCodec,
int maxContentLength) {
super(sourceCodec, upgradeCodec, maxContentLength);
this.http2FrameCodec = http2FrameCodec;
}

@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
decoded = true;
if (msg instanceof HttpResponse httpResponse && !SWITCHING_PROTOCOLS.equals(httpResponse.status())) {
http2FrameCodec.encoder().close();
}
super.decode(ctx, msg);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (!decoded) {
// Exception may happen before decoding the server response
http2FrameCodec.encoder().close();
}
}
}

static final class StreamConnectionObserver implements ConnectionObserver {

final Context context;
Expand Down
Expand Up @@ -62,6 +62,13 @@
"name": "reactor.netty5.http.client.MicrometerHttpClientMetricsHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler"
},
"name": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.server.AbstractHttpServerMetricsHandler"
Expand Down

0 comments on commit ebb9bb1

Please sign in to comment.