Skip to content

Commit

Permalink
Leak detection properties are added for the test execution (#2461)
Browse files Browse the repository at this point in the history
- Fix memory leaks when processing EmptyLastHttpContent
- Avoid buffer leak by closing the HttpRequest passed to the WebSocketServerHandshaker.handshake method
- Declare ReactorNetty#BOUNDARY as on-heap non-releasable buffer
- Do not close the empty full message when invoking SimpleCompressionHandler out of the pipeline
- Http2StreamBridgeServerHandler.channelRead method should use Resource.dispose() method
- [HttpServer] Ensure Http2FrameCodec is created only when there is a need for protocol upgrade
- [HttpClient] Ensure Http2FrameCodec.Encoder is closed when upgrade is rejected. Ensure Http2FrameCodec.Encoder is closed when Exception happened before decoding the server response
- Fix memory leaks in tests

Co-authored-by: Pierre De Rop <pderop@vmware.com>
  • Loading branch information
violetagg and pderop committed Oct 3, 2022
1 parent 430053e commit 2f5fa9b
Show file tree
Hide file tree
Showing 16 changed files with 103 additions and 41 deletions.
12 changes: 6 additions & 6 deletions build.gradle
Expand Up @@ -244,7 +244,10 @@ subprojects {
systemProperty("reactor.trace.cancel", "true")
systemProperty("reactor.trace.nocapacity", "true")
systemProperty("testGroups", project.properties.get("testGroups"))
systemProperty("io.netty.leakDetection.level", "paranoid")
systemProperty("io.netty5.leakDetectionLevel", "paranoid")
systemProperty("io.netty5.leakDetection.targetRecords", "32")
systemProperty("io.netty5.buffer.lifecycleTracingEnabled", "true")
systemProperty("io.netty5.buffer.leakDetectionEnabled", "true")
systemProperty("reactor.netty5.pool.getPermitsSamplingRate", "0.5")
systemProperty("reactor.netty5.pool.returnPermitsSamplingRate", "0.5")
if (project.hasProperty("forceTransport")) {
Expand All @@ -270,11 +273,8 @@ subprojects {

onOutput { descriptor, event ->
def evMsg = event.message
if (evMsg.contains("ResourceLeakDetector")) {
if (!evMsg.contains(" -Dio.netty.leakDetection")
&& !evMsg.contains("DEBUG io.netty.util.ResourceLeakDetectorFactory")) {
logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message)
}
if (evMsg.contains("LoggingLeakCallback")) {
logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message)
}
}
}
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
Expand All @@ -41,8 +40,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_BOUNDARY;

/**
* An outbound-traffic API delegating to an underlying {@link Channel}.
* <p>Note: With HTTP, chaining multiple send operations is discouraged and will not work as expected
Expand Down Expand Up @@ -230,12 +227,10 @@ default NettyOutbound sendFileChunked(Path file, long position, long count) {
* any error during write
*/
default NettyOutbound sendGroups(Publisher<? extends Publisher<? extends Buffer>> dataStreams) {
Buffer BOUNDARY = alloc().copyOf(PREDICATE_GROUP_BOUNDARY.getBytes(StandardCharsets.UTF_8)).makeReadOnly();
return send(
Flux.from(dataStreams)
.concatMap(p -> Flux.<Buffer>from(p)
.concatWith(Mono.just(BOUNDARY.copy(0, BOUNDARY.readableBytes(), true))), 32)
.doFinally(sig -> BOUNDARY.close()),
.concatWith(Mono.just(ReactorNetty.BOUNDARY)), 32),
ReactorNetty.PREDICATE_GROUP_FLUSH);
}

Expand Down
Expand Up @@ -17,7 +17,6 @@

import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.List;
Expand All @@ -35,6 +34,7 @@
import io.netty5.buffer.Buffer;
import io.netty5.buffer.BufferAllocator;
import io.netty5.buffer.BufferHolder;
import io.netty5.buffer.MemoryManager;
import io.netty5.channel.nio.AbstractNioChannel;
import io.netty5.util.Resource;
import io.netty5.channel.Channel;
Expand Down Expand Up @@ -999,15 +999,17 @@ public synchronized Throwable fillInStackTrace() {

static final Predicate<Object> PREDICATE_FLUSH = o -> false;

static final Buffer BOUNDARY = MemoryManager.unsafeWrap(new byte[0]).makeReadOnly();

static final char CHANNEL_ID_PREFIX = '[';
static final String CHANNEL_ID_SUFFIX_1 = "] ";
static final char CHANNEL_ID_SUFFIX_2 = ' ';
static final String ORIGINAL_CHANNEL_ID_PREFIX = "[id: 0x";
static final int ORIGINAL_CHANNEL_ID_PREFIX_LENGTH = ORIGINAL_CHANNEL_ID_PREFIX.length();
static final char TRACE_ID_PREFIX = '(';

public static final String PREDICATE_GROUP_BOUNDARY = "ReactorNettyBoundary";
public static final Predicate<Buffer> PREDICATE_GROUP_FLUSH =
b -> PREDICATE_GROUP_BOUNDARY.equals(b.toString(StandardCharsets.UTF_8));
@SuppressWarnings("ReferenceEquality")
//Design to use reference comparison here
public static final Predicate<Buffer> PREDICATE_GROUP_FLUSH = b -> b == BOUNDARY;

}
Expand Up @@ -159,7 +159,7 @@ void sendFileWithTlsUsesChunkedFile() throws URISyntaxException, SSLException {
//capture the chunks unencrypted, transform as Strings:
new MessageToMessageEncoder<Buffer>() {
@Override
protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg,
protected void encode(ChannelHandlerContext ctx, Buffer msg,
List<Object> out) {
clearMessages.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8));
out.add(msg.split());
Expand Down Expand Up @@ -252,7 +252,7 @@ void sendFileWithForceChunkedFileUsesStrategyChunks()
//transform the Buffer chunks into Strings:
new MessageToMessageEncoder<Buffer>() {
@Override
protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg,
protected void encode(ChannelHandlerContext ctx, Buffer msg,
List<Object> out) {
out.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -338,4 +338,4 @@ static <S> Mono<Void> mockSendUsing(Connection c, Callable<? extends S> sourceIn
sourceCleanup
);
}
}
}
Expand Up @@ -41,6 +41,8 @@
import io.netty5.handler.codec.http.HttpClientCodec;
import io.netty5.handler.codec.http.HttpClientUpgradeHandler;
import io.netty5.handler.codec.http.HttpContentDecompressor;
import io.netty5.handler.codec.http.HttpObject;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.headers.HttpHeaders;
import io.netty5.handler.codec.http.HttpMethod;
import io.netty5.handler.codec.http2.Http2ClientUpgradeCodec;
Expand Down Expand Up @@ -81,6 +83,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import static io.netty5.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static reactor.netty5.ReactorNetty.format;
import static reactor.netty5.http.client.Http2ConnectionProvider.OWNER;

Expand Down Expand Up @@ -607,8 +610,8 @@ static void configureHttp11OrH2CleartextPipeline(
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue));

HttpClientUpgradeHandler<?> upgradeHandler =
new HttpClientUpgradeHandler<DefaultHttpContent>(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
ReactorNettyHttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler(
http2FrameCodec, httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());

p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
Expand Down Expand Up @@ -1001,6 +1004,39 @@ public void onUncaughtException(Connection connection, Throwable error) {
}
}

static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler<DefaultHttpContent> {

final Http2FrameCodec http2FrameCodec;

boolean decoded;

ReactorNettyHttpClientUpgradeHandler(
Http2FrameCodec http2FrameCodec,
SourceCodec sourceCodec,
UpgradeCodec upgradeCodec,
int maxContentLength) {
super(sourceCodec, upgradeCodec, maxContentLength);
this.http2FrameCodec = http2FrameCodec;
}

@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
decoded = true;
if (msg instanceof HttpResponse httpResponse && !SWITCHING_PROTOCOLS.equals(httpResponse.status())) {
http2FrameCodec.encoder().close();
}
super.decode(ctx, msg);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (!decoded) {
// Exception may happen before decoding the server response
http2FrameCodec.encoder().close();
}
}
}

static final class StreamConnectionObserver implements ConnectionObserver {

final Context context;
Expand Down
Expand Up @@ -637,14 +637,17 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Received last HTTP packet"));
}
if (!(msg instanceof EmptyLastHttpContent)) {
if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) {
if (redirecting != null) {
Resource.dispose(msg);
}
else {
super.onInboundNext(ctx, msg);
}
}
else {
emptyLastHttpContent.close();
}

if (redirecting == null) {
// EmitResult is ignored as it is guaranteed that there will be only one emission of LastHttpContent
Expand Down
Expand Up @@ -29,7 +29,7 @@
import io.netty5.handler.codec.http.LastHttpContent;
import io.netty5.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty5.handler.ssl.SslHandler;
import io.netty5.util.ReferenceCountUtil;
import io.netty5.util.Resource;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureContextListener;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -125,7 +125,7 @@ else if (!pendingResponse) {
HttpServerOperations.log.debug(format(ctx.channel(), "Dropped HTTP content, " +
"since response has been sent already: {}"), msg);
}
ReferenceCountUtil.release(msg);
Resource.dispose(msg);
ctx.read();
return;
}
Expand Down
Expand Up @@ -747,10 +747,10 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter {
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
if (addHttp2FrameCodec) {
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec);
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodecBuilder.build());
}

pipeline.addAfter(ctx.pipeline().context(upgrader.http2FrameCodec).name(),
pipeline.addAfter(ctx.pipeline().context(Http2FrameCodec.class).name(),
NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader));

pipeline.remove(this);
Expand Down Expand Up @@ -850,7 +850,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
final BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2FrameCodec http2FrameCodec;
final Http2FrameCodecBuilder http2FrameCodecBuilder;
final ConnectionObserver listener;
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>>
mapHandle;
Expand Down Expand Up @@ -879,7 +879,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
this.compressPredicate = compressPredicate;
this.formDecoderProvider = formDecoderProvider;
this.forwardedHeaderHandler = forwardedHeaderHandler;
Http2FrameCodecBuilder http2FrameCodecBuilder =
this.http2FrameCodecBuilder =
Http2FrameCodecBuilder.forServer()
.validateHeaders(validate)
.initialSettings(http2Settings);
Expand All @@ -889,7 +889,6 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
LogLevel.DEBUG,
"reactor.netty5.http.server.h2"));
}
this.http2FrameCodec = http2FrameCodecBuilder.build();
this.listener = listener;
this.mapHandle = mapHandle;
this.metricsRecorder = metricsRecorder;
Expand All @@ -913,7 +912,7 @@ protected void initChannel(Channel ch) {
@Nullable
public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false));
return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false));
}
else {
return null;
Expand Down
Expand Up @@ -520,8 +520,8 @@ else if (channel().pipeline()
.get(NettyPipeline.CompressionHandler) == null) {
SimpleCompressionHandler handler = new SimpleCompressionHandler();
try {
// decodeAndClose(...) is needed only to initialize the acceptEncodingQueue
handler.decodeAndClose(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest);
// decode(...) is needed only to initialize the acceptEncodingQueue
handler.decode(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest, false);

addHandlerFirst(NettyPipeline.CompressionHandler, handler);
}
Expand Down Expand Up @@ -559,9 +559,12 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
return;
}
if (msg instanceof HttpContent) {
if (!(msg instanceof EmptyLastHttpContent)) {
if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) {
super.onInboundNext(ctx, msg);
}
else {
emptyLastHttpContent.close();
}
if (msg instanceof LastHttpContent) {
//force auto read to enable more accurate close selection now inbound is done
channel().setOption(ChannelOption.AUTO_READ, true);
Expand Down
Expand Up @@ -37,8 +37,7 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
return super.write(ctx, msg);
}

@Override
protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
void decode(ChannelHandlerContext ctx, HttpRequest msg, boolean release) throws Exception {
HttpRequest request = msg;
if (msg instanceof FullHttpRequest fullHttpRequest &&
(!fullHttpRequest.isAccessible() || fullHttpRequest.payload().readableBytes() == 0)) {
Expand All @@ -48,7 +47,15 @@ protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws
// 2. fireChannelRead(...) is invoked at the end of super.decodeAndClose(...) which will end up
// in io.netty5.channel.DefaultChannelPipeline.onUnhandledInboundMessage which closes the msg.
request = new DefaultHttpRequest(msg.protocolVersion(), msg.method(), msg.uri(), msg.headers());
if (release && fullHttpRequest.isAccessible()) {
fullHttpRequest.close();
}
}
super.decodeAndClose(ctx, request);
}

@Override
protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
decode(ctx, msg, true);
}
}
Expand Up @@ -25,7 +25,7 @@
import io.netty5.handler.codec.http.EmptyLastHttpContent;
import io.netty5.handler.codec.http.HttpHeaderNames;
import io.netty5.handler.codec.http.headers.HttpHeaders;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.FullHttpRequest;
import io.netty5.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.PongWebSocketFrame;
Expand Down Expand Up @@ -85,7 +85,7 @@ final class WebsocketServerOperations extends HttpServerOperations
removeHandler(NettyPipeline.AccessLogHandler);
removeHandler(NettyPipeline.HttpMetricsHandler);

HttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(),
FullHttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(),
channel.bufferAllocator().allocate(0));

request.headers()
Expand Down Expand Up @@ -115,6 +115,7 @@ final class WebsocketServerOperations extends HttpServerOperations
request,
responseHeaders)
.addListener(f -> {
request.close();
if (replaced.rebind(this)) {
markPersistent(false);
// This change is needed after the Netty change https://github.com/netty/netty/pull/11966
Expand Down
Expand Up @@ -62,6 +62,13 @@
"name": "reactor.netty5.http.client.MicrometerHttpClientMetricsHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler"
},
"name": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.server.AbstractHttpServerMetricsHandler"
Expand Down
Expand Up @@ -165,7 +165,10 @@ private void doTestStatus(HttpResponseStatus status) {
EmbeddedChannel channel = new EmbeddedChannel();
HttpClientOperations ops = new HttpClientOperations(() -> channel,
ConnectionObserver.emptyListener());
ops.setNettyResponse(new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0)));
assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase());
try (DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0))) {
ops.setNettyResponse(response);
assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase());
}
}
}
Expand Up @@ -1850,8 +1850,10 @@ private void doTestStatus(HttpResponseStatus status) {
null,
false);
ops.status(status);
HttpMessage response = ops.newFullBodyMessage(channel.bufferAllocator().allocate(0));
assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase());
try (Buffer buffer = channel.bufferAllocator().allocate(0)) {
HttpMessage response = ops.newFullBodyMessage(buffer);
assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase());
}
channel.close();
}

Expand Down
Expand Up @@ -59,6 +59,8 @@ void responseNonChunked() {
channel.writeOutbound(newHttpResponse(false));

channel.writeOutbound(new DefaultLastHttpContent(channel.bufferAllocator().allocate(0)));

assertThat(channel.finishAndReleaseAll()).isTrue();
}

@Test
Expand Down

0 comments on commit 2f5fa9b

Please sign in to comment.