From 2f5fa9b54f7f0feca39e47c9e9767a51acedce22 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 3 Oct 2022 15:32:28 +0300 Subject: [PATCH] Leak detection properties are added for the test execution (#2461) - Fix memory leaks when processing EmptyLastHttpContent - Avoid buffer leak by closing the HttpRequest passed to the WebSocketServerHandshaker.handshake method - Declare ReactorNetty#BOUNDARY as on-heap non-releasable buffer - Do not close the empty full message when invoking SimpleCompressionHandler out of the pipeline - Http2StreamBridgeServerHandler.channelRead method should use Resource.dispose() method - [HttpServer] Ensure Http2FrameCodec is created only when there is a need for protocol upgrade - [HttpClient] Ensure Http2FrameCodec.Encoder is closed when upgrade is rejected. Ensure Http2FrameCodec.Encoder is closed when Exception happened before decoding the server response - Fix memory leaks in tests Co-authored-by: Pierre De Rop --- build.gradle | 12 +++--- .../java/reactor/netty5/NettyOutbound.java | 7 +--- .../java/reactor/netty5/ReactorNetty.java | 10 +++-- .../reactor/netty5/NettyOutboundTest.java | 6 +-- .../netty5/http/client/HttpClientConfig.java | 40 ++++++++++++++++++- .../http/client/HttpClientOperations.java | 5 ++- .../Http2StreamBridgeServerHandler.java | 4 +- .../netty5/http/server/HttpServerConfig.java | 11 +++-- .../http/server/HttpServerOperations.java | 9 +++-- .../http/server/SimpleCompressionHandler.java | 11 ++++- .../server/WebsocketServerOperations.java | 5 ++- .../reactor-netty5-http/reflect-config.json | 7 ++++ .../http/client/HttpClientOperationsTest.java | 7 +++- .../netty5/http/server/HttpServerTests.java | 6 ++- .../logging/AccessLogHandlerH1Tests.java | 2 + .../logging/AccessLogHandlerH2Tests.java | 2 + 16 files changed, 103 insertions(+), 41 deletions(-) diff --git a/build.gradle b/build.gradle index f456ace03a..2b8824db9e 100644 --- a/build.gradle +++ b/build.gradle @@ -244,7 +244,10 @@ subprojects { systemProperty("reactor.trace.cancel", "true") systemProperty("reactor.trace.nocapacity", "true") systemProperty("testGroups", project.properties.get("testGroups")) - systemProperty("io.netty.leakDetection.level", "paranoid") + systemProperty("io.netty5.leakDetectionLevel", "paranoid") + systemProperty("io.netty5.leakDetection.targetRecords", "32") + systemProperty("io.netty5.buffer.lifecycleTracingEnabled", "true") + systemProperty("io.netty5.buffer.leakDetectionEnabled", "true") systemProperty("reactor.netty5.pool.getPermitsSamplingRate", "0.5") systemProperty("reactor.netty5.pool.returnPermitsSamplingRate", "0.5") if (project.hasProperty("forceTransport")) { @@ -270,11 +273,8 @@ subprojects { onOutput { descriptor, event -> def evMsg = event.message - if (evMsg.contains("ResourceLeakDetector")) { - if (!evMsg.contains(" -Dio.netty.leakDetection") - && !evMsg.contains("DEBUG io.netty.util.ResourceLeakDetectorFactory")) { - logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message) - } + if (evMsg.contains("LoggingLeakCallback")) { + logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message) } } } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java index 9ae15ea9a8..94c4b697bc 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java @@ -20,7 +20,6 @@ import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -41,8 +40,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_BOUNDARY; - /** * An outbound-traffic API delegating to an underlying {@link Channel}. *

Note: With HTTP, chaining multiple send operations is discouraged and will not work as expected @@ -230,12 +227,10 @@ default NettyOutbound sendFileChunked(Path file, long position, long count) { * any error during write */ default NettyOutbound sendGroups(Publisher> dataStreams) { - Buffer BOUNDARY = alloc().copyOf(PREDICATE_GROUP_BOUNDARY.getBytes(StandardCharsets.UTF_8)).makeReadOnly(); return send( Flux.from(dataStreams) .concatMap(p -> Flux.from(p) - .concatWith(Mono.just(BOUNDARY.copy(0, BOUNDARY.readableBytes(), true))), 32) - .doFinally(sig -> BOUNDARY.close()), + .concatWith(Mono.just(ReactorNetty.BOUNDARY)), 32), ReactorNetty.PREDICATE_GROUP_FLUSH); } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java b/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java index d733353daa..51db4bd90c 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java @@ -17,7 +17,6 @@ import java.net.SocketAddress; import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.ZoneId; import java.util.List; @@ -35,6 +34,7 @@ import io.netty5.buffer.Buffer; import io.netty5.buffer.BufferAllocator; import io.netty5.buffer.BufferHolder; +import io.netty5.buffer.MemoryManager; import io.netty5.channel.nio.AbstractNioChannel; import io.netty5.util.Resource; import io.netty5.channel.Channel; @@ -999,6 +999,8 @@ public synchronized Throwable fillInStackTrace() { static final Predicate PREDICATE_FLUSH = o -> false; + static final Buffer BOUNDARY = MemoryManager.unsafeWrap(new byte[0]).makeReadOnly(); + static final char CHANNEL_ID_PREFIX = '['; static final String CHANNEL_ID_SUFFIX_1 = "] "; static final char CHANNEL_ID_SUFFIX_2 = ' '; @@ -1006,8 +1008,8 @@ public synchronized Throwable fillInStackTrace() { static final int ORIGINAL_CHANNEL_ID_PREFIX_LENGTH = ORIGINAL_CHANNEL_ID_PREFIX.length(); static final char TRACE_ID_PREFIX = '('; - public static final String PREDICATE_GROUP_BOUNDARY = "ReactorNettyBoundary"; - public static final Predicate PREDICATE_GROUP_FLUSH = - b -> PREDICATE_GROUP_BOUNDARY.equals(b.toString(StandardCharsets.UTF_8)); + @SuppressWarnings("ReferenceEquality") + //Design to use reference comparison here + public static final Predicate PREDICATE_GROUP_FLUSH = b -> b == BOUNDARY; } diff --git a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java index 8be94d3a1c..3ebef8d9e0 100644 --- a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java +++ b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java @@ -159,7 +159,7 @@ void sendFileWithTlsUsesChunkedFile() throws URISyntaxException, SSLException { //capture the chunks unencrypted, transform as Strings: new MessageToMessageEncoder() { @Override - protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg, + protected void encode(ChannelHandlerContext ctx, Buffer msg, List out) { clearMessages.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8)); out.add(msg.split()); @@ -252,7 +252,7 @@ void sendFileWithForceChunkedFileUsesStrategyChunks() //transform the Buffer chunks into Strings: new MessageToMessageEncoder() { @Override - protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg, + protected void encode(ChannelHandlerContext ctx, Buffer msg, List out) { out.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8)); } @@ -338,4 +338,4 @@ static Mono mockSendUsing(Connection c, Callable sourceIn sourceCleanup ); } -} \ No newline at end of file +} diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java index 72f280fc11..7b46f14c5a 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java @@ -41,6 +41,8 @@ import io.netty5.handler.codec.http.HttpClientCodec; import io.netty5.handler.codec.http.HttpClientUpgradeHandler; import io.netty5.handler.codec.http.HttpContentDecompressor; +import io.netty5.handler.codec.http.HttpObject; +import io.netty5.handler.codec.http.HttpResponse; import io.netty5.handler.codec.http.headers.HttpHeaders; import io.netty5.handler.codec.http.HttpMethod; import io.netty5.handler.codec.http2.Http2ClientUpgradeCodec; @@ -81,6 +83,7 @@ import reactor.util.annotation.Nullable; import reactor.util.context.Context; +import static io.netty5.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS; import static reactor.netty5.ReactorNetty.format; import static reactor.netty5.http.client.Http2ConnectionProvider.OWNER; @@ -607,8 +610,8 @@ static void configureHttp11OrH2CleartextPipeline( Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue)); - HttpClientUpgradeHandler upgradeHandler = - new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); + ReactorNettyHttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler( + http2FrameCodec, httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler) @@ -1001,6 +1004,39 @@ public void onUncaughtException(Connection connection, Throwable error) { } } + static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler { + + final Http2FrameCodec http2FrameCodec; + + boolean decoded; + + ReactorNettyHttpClientUpgradeHandler( + Http2FrameCodec http2FrameCodec, + SourceCodec sourceCodec, + UpgradeCodec upgradeCodec, + int maxContentLength) { + super(sourceCodec, upgradeCodec, maxContentLength); + this.http2FrameCodec = http2FrameCodec; + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + decoded = true; + if (msg instanceof HttpResponse httpResponse && !SWITCHING_PROTOCOLS.equals(httpResponse.status())) { + http2FrameCodec.encoder().close(); + } + super.decode(ctx, msg); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + if (!decoded) { + // Exception may happen before decoding the server response + http2FrameCodec.encoder().close(); + } + } + } + static final class StreamConnectionObserver implements ConnectionObserver { final Context context; diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java index e284b74a3f..d8620ebef2 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java @@ -637,7 +637,7 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { if (log.isDebugEnabled()) { log.debug(format(channel(), "Received last HTTP packet")); } - if (!(msg instanceof EmptyLastHttpContent)) { + if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) { if (redirecting != null) { Resource.dispose(msg); } @@ -645,6 +645,9 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { super.onInboundNext(ctx, msg); } } + else { + emptyLastHttpContent.close(); + } if (redirecting == null) { // EmitResult is ignored as it is guaranteed that there will be only one emission of LastHttpContent diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java index 93a9eb1b20..60efb6dd33 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java @@ -29,7 +29,7 @@ import io.netty5.handler.codec.http.LastHttpContent; import io.netty5.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty5.handler.ssl.SslHandler; -import io.netty5.util.ReferenceCountUtil; +import io.netty5.util.Resource; import io.netty5.util.concurrent.Future; import io.netty5.util.concurrent.FutureContextListener; import reactor.core.publisher.Mono; @@ -125,7 +125,7 @@ else if (!pendingResponse) { HttpServerOperations.log.debug(format(ctx.channel(), "Dropped HTTP content, " + "since response has been sent already: {}"), msg); } - ReferenceCountUtil.release(msg); + Resource.dispose(msg); ctx.read(); return; } diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java index 8ca2396bc4..6015aa45c8 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java @@ -747,10 +747,10 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline pipeline = ctx.pipeline(); if (addHttp2FrameCodec) { - pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec); + pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodecBuilder.build()); } - pipeline.addAfter(ctx.pipeline().context(upgrader.http2FrameCodec).name(), + pipeline.addAfter(ctx.pipeline().context(Http2FrameCodec.class).name(), NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader)); pipeline.remove(this); @@ -850,7 +850,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final BiPredicate compressPredicate; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; - final Http2FrameCodec http2FrameCodec; + final Http2FrameCodecBuilder http2FrameCodecBuilder; final ConnectionObserver listener; final BiFunction, ? super Connection, ? extends Mono> mapHandle; @@ -879,7 +879,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer this.compressPredicate = compressPredicate; this.formDecoderProvider = formDecoderProvider; this.forwardedHeaderHandler = forwardedHeaderHandler; - Http2FrameCodecBuilder http2FrameCodecBuilder = + this.http2FrameCodecBuilder = Http2FrameCodecBuilder.forServer() .validateHeaders(validate) .initialSettings(http2Settings); @@ -889,7 +889,6 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer LogLevel.DEBUG, "reactor.netty5.http.server.h2")); } - this.http2FrameCodec = http2FrameCodecBuilder.build(); this.listener = listener; this.mapHandle = mapHandle; this.metricsRecorder = metricsRecorder; @@ -913,7 +912,7 @@ protected void initChannel(Channel ch) { @Nullable public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { - return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false)); + return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false)); } else { return null; diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java index 7c3e97c255..d22836478f 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java @@ -520,8 +520,8 @@ else if (channel().pipeline() .get(NettyPipeline.CompressionHandler) == null) { SimpleCompressionHandler handler = new SimpleCompressionHandler(); try { - // decodeAndClose(...) is needed only to initialize the acceptEncodingQueue - handler.decodeAndClose(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest); + // decode(...) is needed only to initialize the acceptEncodingQueue + handler.decode(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest, false); addHandlerFirst(NettyPipeline.CompressionHandler, handler); } @@ -559,9 +559,12 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { return; } if (msg instanceof HttpContent) { - if (!(msg instanceof EmptyLastHttpContent)) { + if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) { super.onInboundNext(ctx, msg); } + else { + emptyLastHttpContent.close(); + } if (msg instanceof LastHttpContent) { //force auto read to enable more accurate close selection now inbound is done channel().setOption(ChannelOption.AUTO_READ, true); diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java index 595bbdfe59..9e73fa9a02 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java @@ -37,8 +37,7 @@ public Future write(ChannelHandlerContext ctx, Object msg) { return super.write(ctx, msg); } - @Override - protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { + void decode(ChannelHandlerContext ctx, HttpRequest msg, boolean release) throws Exception { HttpRequest request = msg; if (msg instanceof FullHttpRequest fullHttpRequest && (!fullHttpRequest.isAccessible() || fullHttpRequest.payload().readableBytes() == 0)) { @@ -48,7 +47,15 @@ protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws // 2. fireChannelRead(...) is invoked at the end of super.decodeAndClose(...) which will end up // in io.netty5.channel.DefaultChannelPipeline.onUnhandledInboundMessage which closes the msg. request = new DefaultHttpRequest(msg.protocolVersion(), msg.method(), msg.uri(), msg.headers()); + if (release && fullHttpRequest.isAccessible()) { + fullHttpRequest.close(); + } } super.decodeAndClose(ctx, request); } + + @Override + protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { + decode(ctx, msg, true); + } } diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java index 62eda74cef..ba0bfbfdae 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java @@ -25,7 +25,7 @@ import io.netty5.handler.codec.http.EmptyLastHttpContent; import io.netty5.handler.codec.http.HttpHeaderNames; import io.netty5.handler.codec.http.headers.HttpHeaders; -import io.netty5.handler.codec.http.HttpRequest; +import io.netty5.handler.codec.http.FullHttpRequest; import io.netty5.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty5.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty5.handler.codec.http.websocketx.PongWebSocketFrame; @@ -85,7 +85,7 @@ final class WebsocketServerOperations extends HttpServerOperations removeHandler(NettyPipeline.AccessLogHandler); removeHandler(NettyPipeline.HttpMetricsHandler); - HttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(), + FullHttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(), channel.bufferAllocator().allocate(0)); request.headers() @@ -115,6 +115,7 @@ final class WebsocketServerOperations extends HttpServerOperations request, responseHeaders) .addListener(f -> { + request.close(); if (replaced.rebind(this)) { markPersistent(false); // This change is needed after the Netty change https://github.com/netty/netty/pull/11966 diff --git a/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json b/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json index 6a106308d1..8c946f8d9e 100644 --- a/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json +++ b/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json @@ -62,6 +62,13 @@ "name": "reactor.netty5.http.client.MicrometerHttpClientMetricsHandler", "queryAllPublicMethods": true }, + { + "condition": { + "typeReachable": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler" + }, + "name": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler", + "queryAllPublicMethods": true + }, { "condition": { "typeReachable": "reactor.netty5.http.server.AbstractHttpServerMetricsHandler" diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java index 779918b145..0c68b749a9 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java @@ -165,7 +165,10 @@ private void doTestStatus(HttpResponseStatus status) { EmbeddedChannel channel = new EmbeddedChannel(); HttpClientOperations ops = new HttpClientOperations(() -> channel, ConnectionObserver.emptyListener()); - ops.setNettyResponse(new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0))); - assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + try (DefaultFullHttpResponse response = + new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0))) { + ops.setNettyResponse(response); + assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + } } } diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java index eef396afbc..cde7189b3f 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java @@ -1850,8 +1850,10 @@ private void doTestStatus(HttpResponseStatus status) { null, false); ops.status(status); - HttpMessage response = ops.newFullBodyMessage(channel.bufferAllocator().allocate(0)); - assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + try (Buffer buffer = channel.bufferAllocator().allocate(0)) { + HttpMessage response = ops.newFullBodyMessage(buffer); + assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + } channel.close(); } diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java index cb798a14cf..ae4cc321f4 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java @@ -59,6 +59,8 @@ void responseNonChunked() { channel.writeOutbound(newHttpResponse(false)); channel.writeOutbound(new DefaultLastHttpContent(channel.bufferAllocator().allocate(0))); + + assertThat(channel.finishAndReleaseAll()).isTrue(); } @Test diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java index a83453dc66..7ed071334c 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java @@ -61,6 +61,8 @@ void accessLogArgs() { Buffer buffer = channel.bufferAllocator().allocate(RESPONSE_CONTENT.length); buffer.writeBytes(RESPONSE_CONTENT).makeReadOnly(); channel.writeOutbound(new DefaultHttp2DataFrame(buffer.send(), true)); + + assertThat(channel.finishAndReleaseAll()).isTrue(); } private void assertAccessLogArgProvider(AccessLogArgProvider args, SocketAddress remoteAddress) {