Skip to content

Commit

Permalink
Ensure Http2FrameCodec.Encoder is closed when upgrade is rejected
Browse files Browse the repository at this point in the history
Ensure Http2FrameCodec.Encoder is closed when Exception happened
before decoding the server response
  • Loading branch information
violetagg committed Oct 3, 2022
1 parent 638d10d commit 15c05bd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
Expand Up @@ -41,6 +41,8 @@
import io.netty5.handler.codec.http.HttpClientCodec;
import io.netty5.handler.codec.http.HttpClientUpgradeHandler;
import io.netty5.handler.codec.http.HttpContentDecompressor;
import io.netty5.handler.codec.http.HttpObject;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.headers.HttpHeaders;
import io.netty5.handler.codec.http.HttpMethod;
import io.netty5.handler.codec.http2.Http2ClientUpgradeCodec;
Expand Down Expand Up @@ -81,6 +83,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import static io.netty5.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static reactor.netty5.ReactorNetty.format;
import static reactor.netty5.http.client.Http2ConnectionProvider.OWNER;

Expand Down Expand Up @@ -607,8 +610,8 @@ static void configureHttp11OrH2CleartextPipeline(
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue));

HttpClientUpgradeHandler<?> upgradeHandler =
new HttpClientUpgradeHandler<DefaultHttpContent>(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
ReactorNettyHttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler(
http2FrameCodec, httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());

p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
Expand Down Expand Up @@ -1001,6 +1004,39 @@ public void onUncaughtException(Connection connection, Throwable error) {
}
}

static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler<DefaultHttpContent> {

final Http2FrameCodec http2FrameCodec;

boolean decoded;

ReactorNettyHttpClientUpgradeHandler(
Http2FrameCodec http2FrameCodec,
SourceCodec sourceCodec,
UpgradeCodec upgradeCodec,
int maxContentLength) {
super(sourceCodec, upgradeCodec, maxContentLength);
this.http2FrameCodec = http2FrameCodec;
}

@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
decoded = true;
if (msg instanceof HttpResponse httpResponse && !SWITCHING_PROTOCOLS.equals(httpResponse.status())) {
http2FrameCodec.encoder().close();
}
super.decode(ctx, msg);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (!decoded) {
// Exception may happen before decoding the server response
http2FrameCodec.encoder().close();
}
}
}

static final class StreamConnectionObserver implements ConnectionObserver {

final Context context;
Expand Down
Expand Up @@ -62,6 +62,13 @@
"name": "reactor.netty5.http.client.MicrometerHttpClientMetricsHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler"
},
"name": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.server.AbstractHttpServerMetricsHandler"
Expand Down

0 comments on commit 15c05bd

Please sign in to comment.