From 9a1be784bfd3e8e39853009e4d4988f7027b7974 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 2 Sep 2022 12:59:08 +0300 Subject: [PATCH 01/15] [test] Leak detection properties are added for the test execution --- build.gradle | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 35f61bb7b2..de957345d7 100644 --- a/build.gradle +++ b/build.gradle @@ -244,7 +244,10 @@ subprojects { systemProperty("reactor.trace.cancel", "true") systemProperty("reactor.trace.nocapacity", "true") systemProperty("testGroups", project.properties.get("testGroups")) - systemProperty("io.netty.leakDetection.level", "paranoid") + systemProperty("io.netty5.leakDetectionLevel", "paranoid") + systemProperty("io.netty5.leakDetection.targetRecords", "32") + systemProperty("io.netty5.buffer.lifecycleTracingEnabled", "true") + systemProperty("io.netty5.buffer.leakDetectionEnabled", "true") systemProperty("reactor.netty5.pool.getPermitsSamplingRate", "0.5") systemProperty("reactor.netty5.pool.returnPermitsSamplingRate", "0.5") if (project.hasProperty("forceTransport")) { From 456bdde1a58c7802cd578a4f4d3ae5efb99600ca Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 5 Sep 2022 14:51:34 +0300 Subject: [PATCH 02/15] Fix memory leaks when processing EmptyLastHttpContent Fix memory leak in test --- build.gradle | 7 ++----- .../reactor/netty5/http/client/HttpClientOperations.java | 5 ++++- .../reactor/netty5/http/server/HttpServerOperations.java | 5 ++++- .../netty5/http/client/HttpClientOperationsTest.java | 7 +++++-- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/build.gradle b/build.gradle index de957345d7..f2ccefab88 100644 --- a/build.gradle +++ b/build.gradle @@ -273,11 +273,8 @@ subprojects { onOutput { descriptor, event -> def evMsg = event.message - if (evMsg.contains("ResourceLeakDetector")) { - if (!evMsg.contains(" -Dio.netty.leakDetection") - && !evMsg.contains("DEBUG io.netty.util.ResourceLeakDetectorFactory")) { - logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message) - } + if (evMsg.contains("LoggingLeakCallback")) { + logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message) } } } diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java index e284b74a3f..d8620ebef2 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientOperations.java @@ -637,7 +637,7 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { if (log.isDebugEnabled()) { log.debug(format(channel(), "Received last HTTP packet")); } - if (!(msg instanceof EmptyLastHttpContent)) { + if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) { if (redirecting != null) { Resource.dispose(msg); } @@ -645,6 +645,9 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { super.onInboundNext(ctx, msg); } } + else { + emptyLastHttpContent.close(); + } if (redirecting == null) { // EmitResult is ignored as it is guaranteed that there will be only one emission of LastHttpContent diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java index 7c3e97c255..2f9323906e 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java @@ -559,9 +559,12 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { return; } if (msg instanceof HttpContent) { - if (!(msg instanceof EmptyLastHttpContent)) { + if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) { super.onInboundNext(ctx, msg); } + else { + emptyLastHttpContent.close(); + } if (msg instanceof LastHttpContent) { //force auto read to enable more accurate close selection now inbound is done channel().setOption(ChannelOption.AUTO_READ, true); diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java index 779918b145..0c68b749a9 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/client/HttpClientOperationsTest.java @@ -165,7 +165,10 @@ private void doTestStatus(HttpResponseStatus status) { EmbeddedChannel channel = new EmbeddedChannel(); HttpClientOperations ops = new HttpClientOperations(() -> channel, ConnectionObserver.emptyListener()); - ops.setNettyResponse(new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0))); - assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + try (DefaultFullHttpResponse response = + new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0))) { + ops.setNettyResponse(response); + assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + } } } From 09483b99c7b350297dabbf0d71a3a24754e70d12 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Tue, 6 Sep 2022 17:30:53 +0200 Subject: [PATCH 03/15] Avoid buffer leak by closing the HttpRequest passed to the WebSocketServerHandshaker.handshake method --- .../reactor/netty5/http/server/WebsocketServerOperations.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java index 62eda74cef..f1c036b4a6 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java @@ -33,6 +33,7 @@ import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; +import io.netty5.util.Resource; import io.netty5.util.concurrent.Future; import io.netty5.util.concurrent.FutureListener; import org.reactivestreams.Publisher; @@ -115,6 +116,7 @@ final class WebsocketServerOperations extends HttpServerOperations request, responseHeaders) .addListener(f -> { + Resource.dispose(request); if (replaced.rebind(this)) { markPersistent(false); // This change is needed after the Netty change https://github.com/netty/netty/pull/11966 From eeb1dd7763fe160c19a80bb0ca7dfb4ff441d0a6 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 7 Sep 2022 10:02:50 +0200 Subject: [PATCH 04/15] Directly close the http request instead of using Resource.dispose --- .../netty5/http/server/WebsocketServerOperations.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java index f1c036b4a6..ba0bfbfdae 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/WebsocketServerOperations.java @@ -25,7 +25,7 @@ import io.netty5.handler.codec.http.EmptyLastHttpContent; import io.netty5.handler.codec.http.HttpHeaderNames; import io.netty5.handler.codec.http.headers.HttpHeaders; -import io.netty5.handler.codec.http.HttpRequest; +import io.netty5.handler.codec.http.FullHttpRequest; import io.netty5.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty5.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty5.handler.codec.http.websocketx.PongWebSocketFrame; @@ -33,7 +33,6 @@ import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; -import io.netty5.util.Resource; import io.netty5.util.concurrent.Future; import io.netty5.util.concurrent.FutureListener; import org.reactivestreams.Publisher; @@ -86,7 +85,7 @@ final class WebsocketServerOperations extends HttpServerOperations removeHandler(NettyPipeline.AccessLogHandler); removeHandler(NettyPipeline.HttpMetricsHandler); - HttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(), + FullHttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(), channel.bufferAllocator().allocate(0)); request.headers() @@ -116,7 +115,7 @@ final class WebsocketServerOperations extends HttpServerOperations request, responseHeaders) .addListener(f -> { - Resource.dispose(request); + request.close(); if (replaced.rebind(this)) { markPersistent(false); // This change is needed after the Netty change https://github.com/netty/netty/pull/11966 From 946d4572b17acdad99fd53a48023eaaaf84ec22c Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Fri, 16 Sep 2022 13:29:09 +0200 Subject: [PATCH 05/15] Close buffers passed to the MessageToMessageEncoder.encodeAndClose method in NettyOutboundTest --- .../src/test/java/reactor/netty5/NettyOutboundTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java index 8be94d3a1c..2c4aaf4dba 100644 --- a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java +++ b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java @@ -163,6 +163,7 @@ protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg, List out) { clearMessages.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8)); out.add(msg.split()); + msg.close(); } }, //transform the ChunkedFile into Buffer chunks: @@ -255,6 +256,7 @@ void sendFileWithForceChunkedFileUsesStrategyChunks() protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg, List out) { out.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8)); + msg.close(); } }, //transform the ChunkedFile into Buffer chunks: @@ -338,4 +340,4 @@ static Mono mockSendUsing(Connection c, Callable sourceIn sourceCleanup ); } -} \ No newline at end of file +} From d7acb9e884b20649b95470380b1f9843310079bf Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Fri, 16 Sep 2022 17:24:26 +0200 Subject: [PATCH 06/15] Override MessageToMessageEncoder.encode method, which will close the message for us --- .../src/test/java/reactor/netty5/NettyOutboundTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java index 2c4aaf4dba..3ebef8d9e0 100644 --- a/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java +++ b/reactor-netty5-core/src/test/java/reactor/netty5/NettyOutboundTest.java @@ -159,11 +159,10 @@ void sendFileWithTlsUsesChunkedFile() throws URISyntaxException, SSLException { //capture the chunks unencrypted, transform as Strings: new MessageToMessageEncoder() { @Override - protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg, + protected void encode(ChannelHandlerContext ctx, Buffer msg, List out) { clearMessages.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8)); out.add(msg.split()); - msg.close(); } }, //transform the ChunkedFile into Buffer chunks: @@ -253,10 +252,9 @@ void sendFileWithForceChunkedFileUsesStrategyChunks() //transform the Buffer chunks into Strings: new MessageToMessageEncoder() { @Override - protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg, + protected void encode(ChannelHandlerContext ctx, Buffer msg, List out) { out.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8)); - msg.close(); } }, //transform the ChunkedFile into Buffer chunks: From d0826d74008dc28319d3cb36f279c200a0e2e272 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Fri, 16 Sep 2022 17:50:15 +0200 Subject: [PATCH 07/15] Avoid buf leak in TcpServerTests.retryStrategiesWhenServerFails by closing BOUNDARY in NettyOutbound.sendGroups --- .../src/main/java/reactor/netty5/NettyOutbound.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java index 9ae15ea9a8..dadf5624e4 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java @@ -42,6 +42,7 @@ import reactor.core.publisher.Mono; import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_BOUNDARY; +import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_FLUSH; /** * An outbound-traffic API delegating to an underlying {@link Channel}. @@ -236,7 +237,13 @@ default NettyOutbound sendGroups(Publisher .concatMap(p -> Flux.from(p) .concatWith(Mono.just(BOUNDARY.copy(0, BOUNDARY.readableBytes(), true))), 32) .doFinally(sig -> BOUNDARY.close()), - ReactorNetty.PREDICATE_GROUP_FLUSH); + buf -> { + boolean flush = PREDICATE_GROUP_FLUSH.test(buf); + if (flush) { + buf.close(); + } + return flush; + }); } /** From a87336ab929c397d7123f8d507d789061d16cc98 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 20 Sep 2022 15:42:12 +0300 Subject: [PATCH 08/15] Revert "Avoid buf leak in TcpServerTests.retryStrategiesWhenServerFails by closing BOUNDARY in NettyOutbound.sendGroups" This reverts commit c47ba03faa434d633e49d42026fdb37958976b02. --- .../src/main/java/reactor/netty5/NettyOutbound.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java index dadf5624e4..9ae15ea9a8 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java @@ -42,7 +42,6 @@ import reactor.core.publisher.Mono; import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_BOUNDARY; -import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_FLUSH; /** * An outbound-traffic API delegating to an underlying {@link Channel}. @@ -237,13 +236,7 @@ default NettyOutbound sendGroups(Publisher .concatMap(p -> Flux.from(p) .concatWith(Mono.just(BOUNDARY.copy(0, BOUNDARY.readableBytes(), true))), 32) .doFinally(sig -> BOUNDARY.close()), - buf -> { - boolean flush = PREDICATE_GROUP_FLUSH.test(buf); - if (flush) { - buf.close(); - } - return flush; - }); + ReactorNetty.PREDICATE_GROUP_FLUSH); } /** From 11e16f387c90b867caa8c378a6c9ca154f66cd8b Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 20 Sep 2022 17:07:45 +0300 Subject: [PATCH 09/15] Declare ReactorNetty#BOUNDARY as on-heap non-releasable buffer --- .../src/main/java/reactor/netty5/NettyOutbound.java | 7 +------ .../src/main/java/reactor/netty5/ReactorNetty.java | 10 ++++++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java index 9ae15ea9a8..94c4b697bc 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/NettyOutbound.java @@ -20,7 +20,6 @@ import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -41,8 +40,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_BOUNDARY; - /** * An outbound-traffic API delegating to an underlying {@link Channel}. *

Note: With HTTP, chaining multiple send operations is discouraged and will not work as expected @@ -230,12 +227,10 @@ default NettyOutbound sendFileChunked(Path file, long position, long count) { * any error during write */ default NettyOutbound sendGroups(Publisher> dataStreams) { - Buffer BOUNDARY = alloc().copyOf(PREDICATE_GROUP_BOUNDARY.getBytes(StandardCharsets.UTF_8)).makeReadOnly(); return send( Flux.from(dataStreams) .concatMap(p -> Flux.from(p) - .concatWith(Mono.just(BOUNDARY.copy(0, BOUNDARY.readableBytes(), true))), 32) - .doFinally(sig -> BOUNDARY.close()), + .concatWith(Mono.just(ReactorNetty.BOUNDARY)), 32), ReactorNetty.PREDICATE_GROUP_FLUSH); } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java b/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java index d733353daa..51db4bd90c 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/ReactorNetty.java @@ -17,7 +17,6 @@ import java.net.SocketAddress; import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.ZoneId; import java.util.List; @@ -35,6 +34,7 @@ import io.netty5.buffer.Buffer; import io.netty5.buffer.BufferAllocator; import io.netty5.buffer.BufferHolder; +import io.netty5.buffer.MemoryManager; import io.netty5.channel.nio.AbstractNioChannel; import io.netty5.util.Resource; import io.netty5.channel.Channel; @@ -999,6 +999,8 @@ public synchronized Throwable fillInStackTrace() { static final Predicate PREDICATE_FLUSH = o -> false; + static final Buffer BOUNDARY = MemoryManager.unsafeWrap(new byte[0]).makeReadOnly(); + static final char CHANNEL_ID_PREFIX = '['; static final String CHANNEL_ID_SUFFIX_1 = "] "; static final char CHANNEL_ID_SUFFIX_2 = ' '; @@ -1006,8 +1008,8 @@ public synchronized Throwable fillInStackTrace() { static final int ORIGINAL_CHANNEL_ID_PREFIX_LENGTH = ORIGINAL_CHANNEL_ID_PREFIX.length(); static final char TRACE_ID_PREFIX = '('; - public static final String PREDICATE_GROUP_BOUNDARY = "ReactorNettyBoundary"; - public static final Predicate PREDICATE_GROUP_FLUSH = - b -> PREDICATE_GROUP_BOUNDARY.equals(b.toString(StandardCharsets.UTF_8)); + @SuppressWarnings("ReferenceEquality") + //Design to use reference comparison here + public static final Predicate PREDICATE_GROUP_FLUSH = b -> b == BOUNDARY; } From d539649b10270b18b1ebda2585fe5791a6295f4f Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 21 Sep 2022 09:02:45 +0300 Subject: [PATCH 10/15] Do not close the empty full message when invoking SimpleCompressionHandler out of the pipeline --- .../netty5/http/server/HttpServerOperations.java | 4 ++-- .../netty5/http/server/SimpleCompressionHandler.java | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java index 2f9323906e..d22836478f 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerOperations.java @@ -520,8 +520,8 @@ else if (channel().pipeline() .get(NettyPipeline.CompressionHandler) == null) { SimpleCompressionHandler handler = new SimpleCompressionHandler(); try { - // decodeAndClose(...) is needed only to initialize the acceptEncodingQueue - handler.decodeAndClose(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest); + // decode(...) is needed only to initialize the acceptEncodingQueue + handler.decode(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest, false); addHandlerFirst(NettyPipeline.CompressionHandler, handler); } diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java index 595bbdfe59..9e73fa9a02 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/SimpleCompressionHandler.java @@ -37,8 +37,7 @@ public Future write(ChannelHandlerContext ctx, Object msg) { return super.write(ctx, msg); } - @Override - protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { + void decode(ChannelHandlerContext ctx, HttpRequest msg, boolean release) throws Exception { HttpRequest request = msg; if (msg instanceof FullHttpRequest fullHttpRequest && (!fullHttpRequest.isAccessible() || fullHttpRequest.payload().readableBytes() == 0)) { @@ -48,7 +47,15 @@ protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws // 2. fireChannelRead(...) is invoked at the end of super.decodeAndClose(...) which will end up // in io.netty5.channel.DefaultChannelPipeline.onUnhandledInboundMessage which closes the msg. request = new DefaultHttpRequest(msg.protocolVersion(), msg.method(), msg.uri(), msg.headers()); + if (release && fullHttpRequest.isAccessible()) { + fullHttpRequest.close(); + } } super.decodeAndClose(ctx, request); } + + @Override + protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception { + decode(ctx, msg, true); + } } From f0c6b90adf9286172dbdb7424f6836230b887cef Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 21 Sep 2022 11:29:01 +0300 Subject: [PATCH 11/15] Ensure the messages are closed after the test execution --- .../netty5/http/server/logging/AccessLogHandlerH1Tests.java | 2 ++ .../netty5/http/server/logging/AccessLogHandlerH2Tests.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java index cb798a14cf..ae4cc321f4 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH1Tests.java @@ -59,6 +59,8 @@ void responseNonChunked() { channel.writeOutbound(newHttpResponse(false)); channel.writeOutbound(new DefaultLastHttpContent(channel.bufferAllocator().allocate(0))); + + assertThat(channel.finishAndReleaseAll()).isTrue(); } @Test diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java index a83453dc66..7ed071334c 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/logging/AccessLogHandlerH2Tests.java @@ -61,6 +61,8 @@ void accessLogArgs() { Buffer buffer = channel.bufferAllocator().allocate(RESPONSE_CONTENT.length); buffer.writeBytes(RESPONSE_CONTENT).makeReadOnly(); channel.writeOutbound(new DefaultHttp2DataFrame(buffer.send(), true)); + + assertThat(channel.finishAndReleaseAll()).isTrue(); } private void assertAccessLogArgProvider(AccessLogArgProvider args, SocketAddress remoteAddress) { From bf457f0b91def45be13ef71f12849ae4afece34d Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 21 Sep 2022 18:23:20 +0200 Subject: [PATCH 12/15] Fixed leak reproduced by HttpServerTests.testIssue1978H2WithDelay. The Http2StreamBridgeServerHandler.channelRead method was still using old ReferenceCountUtil.release() instead of Resource.dispose() method. --- .../netty5/http/server/Http2StreamBridgeServerHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java index 93a9eb1b20..60efb6dd33 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/Http2StreamBridgeServerHandler.java @@ -29,7 +29,7 @@ import io.netty5.handler.codec.http.LastHttpContent; import io.netty5.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty5.handler.ssl.SslHandler; -import io.netty5.util.ReferenceCountUtil; +import io.netty5.util.Resource; import io.netty5.util.concurrent.Future; import io.netty5.util.concurrent.FutureContextListener; import reactor.core.publisher.Mono; @@ -125,7 +125,7 @@ else if (!pendingResponse) { HttpServerOperations.log.debug(format(ctx.channel(), "Dropped HTTP content, " + "since response has been sent already: {}"), msg); } - ReferenceCountUtil.release(msg); + Resource.dispose(msg); ctx.read(); return; } From 600a3b1015610a8c9c3abc70d0319469405ed24b Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 21 Sep 2022 21:00:16 +0300 Subject: [PATCH 13/15] Fix memory leak in test --- .../java/reactor/netty5/http/server/HttpServerTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java index eef396afbc..cde7189b3f 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java @@ -1850,8 +1850,10 @@ private void doTestStatus(HttpResponseStatus status) { null, false); ops.status(status); - HttpMessage response = ops.newFullBodyMessage(channel.bufferAllocator().allocate(0)); - assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + try (Buffer buffer = channel.bufferAllocator().allocate(0)) { + HttpMessage response = ops.newFullBodyMessage(buffer); + assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase()); + } channel.close(); } From 638d10d0829d3d8c845bff75c2c53fa18743a713 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 23 Sep 2022 14:14:59 +0300 Subject: [PATCH 14/15] Ensure Http2FrameCodec is created only when there is a need for protocol upgrade --- .../reactor/netty5/http/server/HttpServerConfig.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java index 8ca2396bc4..6015aa45c8 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java @@ -747,10 +747,10 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline pipeline = ctx.pipeline(); if (addHttp2FrameCodec) { - pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec); + pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodecBuilder.build()); } - pipeline.addAfter(ctx.pipeline().context(upgrader.http2FrameCodec).name(), + pipeline.addAfter(ctx.pipeline().context(Http2FrameCodec.class).name(), NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader)); pipeline.remove(this); @@ -850,7 +850,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final BiPredicate compressPredicate; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; - final Http2FrameCodec http2FrameCodec; + final Http2FrameCodecBuilder http2FrameCodecBuilder; final ConnectionObserver listener; final BiFunction, ? super Connection, ? extends Mono> mapHandle; @@ -879,7 +879,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer this.compressPredicate = compressPredicate; this.formDecoderProvider = formDecoderProvider; this.forwardedHeaderHandler = forwardedHeaderHandler; - Http2FrameCodecBuilder http2FrameCodecBuilder = + this.http2FrameCodecBuilder = Http2FrameCodecBuilder.forServer() .validateHeaders(validate) .initialSettings(http2Settings); @@ -889,7 +889,6 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer LogLevel.DEBUG, "reactor.netty5.http.server.h2")); } - this.http2FrameCodec = http2FrameCodecBuilder.build(); this.listener = listener; this.mapHandle = mapHandle; this.metricsRecorder = metricsRecorder; @@ -913,7 +912,7 @@ protected void initChannel(Channel ch) { @Nullable public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { - return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false)); + return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false)); } else { return null; From 15c05bd3030f26736095b63c69ed9b0d5c17012e Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 23 Sep 2022 15:22:11 +0300 Subject: [PATCH 15/15] Ensure Http2FrameCodec.Encoder is closed when upgrade is rejected Ensure Http2FrameCodec.Encoder is closed when Exception happened before decoding the server response --- .../netty5/http/client/HttpClientConfig.java | 40 ++++++++++++++++++- .../reactor-netty5-http/reflect-config.json | 7 ++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java index 72f280fc11..7b46f14c5a 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/HttpClientConfig.java @@ -41,6 +41,8 @@ import io.netty5.handler.codec.http.HttpClientCodec; import io.netty5.handler.codec.http.HttpClientUpgradeHandler; import io.netty5.handler.codec.http.HttpContentDecompressor; +import io.netty5.handler.codec.http.HttpObject; +import io.netty5.handler.codec.http.HttpResponse; import io.netty5.handler.codec.http.headers.HttpHeaders; import io.netty5.handler.codec.http.HttpMethod; import io.netty5.handler.codec.http2.Http2ClientUpgradeCodec; @@ -81,6 +83,7 @@ import reactor.util.annotation.Nullable; import reactor.util.context.Context; +import static io.netty5.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS; import static reactor.netty5.ReactorNetty.format; import static reactor.netty5.http.client.Http2ConnectionProvider.OWNER; @@ -607,8 +610,8 @@ static void configureHttp11OrH2CleartextPipeline( Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue)); - HttpClientUpgradeHandler upgradeHandler = - new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); + ReactorNettyHttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler( + http2FrameCodec, httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler) @@ -1001,6 +1004,39 @@ public void onUncaughtException(Connection connection, Throwable error) { } } + static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler { + + final Http2FrameCodec http2FrameCodec; + + boolean decoded; + + ReactorNettyHttpClientUpgradeHandler( + Http2FrameCodec http2FrameCodec, + SourceCodec sourceCodec, + UpgradeCodec upgradeCodec, + int maxContentLength) { + super(sourceCodec, upgradeCodec, maxContentLength); + this.http2FrameCodec = http2FrameCodec; + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + decoded = true; + if (msg instanceof HttpResponse httpResponse && !SWITCHING_PROTOCOLS.equals(httpResponse.status())) { + http2FrameCodec.encoder().close(); + } + super.decode(ctx, msg); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + if (!decoded) { + // Exception may happen before decoding the server response + http2FrameCodec.encoder().close(); + } + } + } + static final class StreamConnectionObserver implements ConnectionObserver { final Context context; diff --git a/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json b/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json index 6a106308d1..8c946f8d9e 100644 --- a/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json +++ b/reactor-netty5-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty5-http/reflect-config.json @@ -62,6 +62,13 @@ "name": "reactor.netty5.http.client.MicrometerHttpClientMetricsHandler", "queryAllPublicMethods": true }, + { + "condition": { + "typeReachable": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler" + }, + "name": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler", + "queryAllPublicMethods": true + }, { "condition": { "typeReachable": "reactor.netty5.http.server.AbstractHttpServerMetricsHandler"