Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leak detection properties are added for the test execution #2461

Merged
merged 15 commits into from Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions build.gradle
Expand Up @@ -244,7 +244,10 @@ subprojects {
systemProperty("reactor.trace.cancel", "true")
systemProperty("reactor.trace.nocapacity", "true")
systemProperty("testGroups", project.properties.get("testGroups"))
systemProperty("io.netty.leakDetection.level", "paranoid")
systemProperty("io.netty5.leakDetectionLevel", "paranoid")
systemProperty("io.netty5.leakDetection.targetRecords", "32")
systemProperty("io.netty5.buffer.lifecycleTracingEnabled", "true")
systemProperty("io.netty5.buffer.leakDetectionEnabled", "true")
systemProperty("reactor.netty5.pool.getPermitsSamplingRate", "0.5")
systemProperty("reactor.netty5.pool.returnPermitsSamplingRate", "0.5")
if (project.hasProperty("forceTransport")) {
Expand All @@ -270,11 +273,8 @@ subprojects {

onOutput { descriptor, event ->
def evMsg = event.message
if (evMsg.contains("ResourceLeakDetector")) {
if (!evMsg.contains(" -Dio.netty.leakDetection")
&& !evMsg.contains("DEBUG io.netty.util.ResourceLeakDetectorFactory")) {
logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message)
}
if (evMsg.contains("LoggingLeakCallback")) {
logger.error("ERROR: Test: " + descriptor + " produced resource leak: " + event.message)
}
}
}
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
Expand All @@ -41,8 +40,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static reactor.netty5.ReactorNetty.PREDICATE_GROUP_BOUNDARY;

/**
* An outbound-traffic API delegating to an underlying {@link Channel}.
* <p>Note: With HTTP, chaining multiple send operations is discouraged and will not work as expected
Expand Down Expand Up @@ -230,12 +227,10 @@ default NettyOutbound sendFileChunked(Path file, long position, long count) {
* any error during write
*/
default NettyOutbound sendGroups(Publisher<? extends Publisher<? extends Buffer>> dataStreams) {
Buffer BOUNDARY = alloc().copyOf(PREDICATE_GROUP_BOUNDARY.getBytes(StandardCharsets.UTF_8)).makeReadOnly();
return send(
Flux.from(dataStreams)
.concatMap(p -> Flux.<Buffer>from(p)
.concatWith(Mono.just(BOUNDARY.copy(0, BOUNDARY.readableBytes(), true))), 32)
.doFinally(sig -> BOUNDARY.close()),
.concatWith(Mono.just(ReactorNetty.BOUNDARY)), 32),
ReactorNetty.PREDICATE_GROUP_FLUSH);
}

Expand Down
Expand Up @@ -17,7 +17,6 @@

import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.List;
Expand All @@ -35,6 +34,7 @@
import io.netty5.buffer.Buffer;
import io.netty5.buffer.BufferAllocator;
import io.netty5.buffer.BufferHolder;
import io.netty5.buffer.MemoryManager;
import io.netty5.channel.nio.AbstractNioChannel;
import io.netty5.util.Resource;
import io.netty5.channel.Channel;
Expand Down Expand Up @@ -999,15 +999,17 @@ public synchronized Throwable fillInStackTrace() {

static final Predicate<Object> PREDICATE_FLUSH = o -> false;

static final Buffer BOUNDARY = MemoryManager.unsafeWrap(new byte[0]).makeReadOnly();

static final char CHANNEL_ID_PREFIX = '[';
static final String CHANNEL_ID_SUFFIX_1 = "] ";
static final char CHANNEL_ID_SUFFIX_2 = ' ';
static final String ORIGINAL_CHANNEL_ID_PREFIX = "[id: 0x";
static final int ORIGINAL_CHANNEL_ID_PREFIX_LENGTH = ORIGINAL_CHANNEL_ID_PREFIX.length();
static final char TRACE_ID_PREFIX = '(';

public static final String PREDICATE_GROUP_BOUNDARY = "ReactorNettyBoundary";
public static final Predicate<Buffer> PREDICATE_GROUP_FLUSH =
b -> PREDICATE_GROUP_BOUNDARY.equals(b.toString(StandardCharsets.UTF_8));
@SuppressWarnings("ReferenceEquality")
//Design to use reference comparison here
public static final Predicate<Buffer> PREDICATE_GROUP_FLUSH = b -> b == BOUNDARY;

}
Expand Up @@ -159,7 +159,7 @@ void sendFileWithTlsUsesChunkedFile() throws URISyntaxException, SSLException {
//capture the chunks unencrypted, transform as Strings:
new MessageToMessageEncoder<Buffer>() {
@Override
protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg,
protected void encode(ChannelHandlerContext ctx, Buffer msg,
List<Object> out) {
clearMessages.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8));
out.add(msg.split());
Expand Down Expand Up @@ -252,7 +252,7 @@ void sendFileWithForceChunkedFileUsesStrategyChunks()
//transform the Buffer chunks into Strings:
new MessageToMessageEncoder<Buffer>() {
@Override
protected void encodeAndClose(ChannelHandlerContext ctx, Buffer msg,
protected void encode(ChannelHandlerContext ctx, Buffer msg,
List<Object> out) {
out.add(msg.readCharSequence(msg.readableBytes(), StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -338,4 +338,4 @@ static <S> Mono<Void> mockSendUsing(Connection c, Callable<? extends S> sourceIn
sourceCleanup
);
}
}
}
Expand Up @@ -41,6 +41,8 @@
import io.netty5.handler.codec.http.HttpClientCodec;
import io.netty5.handler.codec.http.HttpClientUpgradeHandler;
import io.netty5.handler.codec.http.HttpContentDecompressor;
import io.netty5.handler.codec.http.HttpObject;
import io.netty5.handler.codec.http.HttpResponse;
import io.netty5.handler.codec.http.headers.HttpHeaders;
import io.netty5.handler.codec.http.HttpMethod;
import io.netty5.handler.codec.http2.Http2ClientUpgradeCodec;
Expand Down Expand Up @@ -81,6 +83,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import static io.netty5.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static reactor.netty5.ReactorNetty.format;
import static reactor.netty5.http.client.Http2ConnectionProvider.OWNER;

Expand Down Expand Up @@ -607,8 +610,8 @@ static void configureHttp11OrH2CleartextPipeline(
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue));

HttpClientUpgradeHandler<?> upgradeHandler =
new HttpClientUpgradeHandler<DefaultHttpContent>(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
ReactorNettyHttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler(
http2FrameCodec, httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());

p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
Expand Down Expand Up @@ -1001,6 +1004,39 @@ public void onUncaughtException(Connection connection, Throwable error) {
}
}

static final class ReactorNettyHttpClientUpgradeHandler extends HttpClientUpgradeHandler<DefaultHttpContent> {

final Http2FrameCodec http2FrameCodec;

boolean decoded;

ReactorNettyHttpClientUpgradeHandler(
Http2FrameCodec http2FrameCodec,
SourceCodec sourceCodec,
UpgradeCodec upgradeCodec,
int maxContentLength) {
super(sourceCodec, upgradeCodec, maxContentLength);
this.http2FrameCodec = http2FrameCodec;
}

@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
decoded = true;
if (msg instanceof HttpResponse httpResponse && !SWITCHING_PROTOCOLS.equals(httpResponse.status())) {
http2FrameCodec.encoder().close();
}
super.decode(ctx, msg);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (!decoded) {
// Exception may happen before decoding the server response
http2FrameCodec.encoder().close();
}
}
}

static final class StreamConnectionObserver implements ConnectionObserver {

final Context context;
Expand Down
Expand Up @@ -637,14 +637,17 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Received last HTTP packet"));
}
if (!(msg instanceof EmptyLastHttpContent)) {
if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) {
if (redirecting != null) {
Resource.dispose(msg);
}
else {
super.onInboundNext(ctx, msg);
}
}
else {
emptyLastHttpContent.close();
}

if (redirecting == null) {
// EmitResult is ignored as it is guaranteed that there will be only one emission of LastHttpContent
Expand Down
Expand Up @@ -29,7 +29,7 @@
import io.netty5.handler.codec.http.LastHttpContent;
import io.netty5.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty5.handler.ssl.SslHandler;
import io.netty5.util.ReferenceCountUtil;
import io.netty5.util.Resource;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureContextListener;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -125,7 +125,7 @@ else if (!pendingResponse) {
HttpServerOperations.log.debug(format(ctx.channel(), "Dropped HTTP content, " +
"since response has been sent already: {}"), msg);
}
ReferenceCountUtil.release(msg);
Resource.dispose(msg);
ctx.read();
return;
}
Expand Down
Expand Up @@ -747,10 +747,10 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter {
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
if (addHttp2FrameCodec) {
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec);
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodecBuilder.build());
}

pipeline.addAfter(ctx.pipeline().context(upgrader.http2FrameCodec).name(),
pipeline.addAfter(ctx.pipeline().context(Http2FrameCodec.class).name(),
NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader));

pipeline.remove(this);
Expand Down Expand Up @@ -850,7 +850,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
final BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2FrameCodec http2FrameCodec;
final Http2FrameCodecBuilder http2FrameCodecBuilder;
final ConnectionObserver listener;
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>>
mapHandle;
Expand Down Expand Up @@ -879,7 +879,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
this.compressPredicate = compressPredicate;
this.formDecoderProvider = formDecoderProvider;
this.forwardedHeaderHandler = forwardedHeaderHandler;
Http2FrameCodecBuilder http2FrameCodecBuilder =
this.http2FrameCodecBuilder =
Http2FrameCodecBuilder.forServer()
.validateHeaders(validate)
.initialSettings(http2Settings);
Expand All @@ -889,7 +889,6 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
LogLevel.DEBUG,
"reactor.netty5.http.server.h2"));
}
this.http2FrameCodec = http2FrameCodecBuilder.build();
this.listener = listener;
this.mapHandle = mapHandle;
this.metricsRecorder = metricsRecorder;
Expand All @@ -913,7 +912,7 @@ protected void initChannel(Channel ch) {
@Nullable
public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(http2FrameCodec, new H2CleartextCodec(this, false, false));
return new Http2ServerUpgradeCodec(http2FrameCodecBuilder.build(), new H2CleartextCodec(this, false, false));
}
else {
return null;
Expand Down
Expand Up @@ -520,8 +520,8 @@ else if (channel().pipeline()
.get(NettyPipeline.CompressionHandler) == null) {
SimpleCompressionHandler handler = new SimpleCompressionHandler();
try {
// decodeAndClose(...) is needed only to initialize the acceptEncodingQueue
handler.decodeAndClose(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest);
// decode(...) is needed only to initialize the acceptEncodingQueue
handler.decode(channel().pipeline().context(NettyPipeline.ReactiveBridge), nettyRequest, false);

addHandlerFirst(NettyPipeline.CompressionHandler, handler);
}
Expand Down Expand Up @@ -559,9 +559,12 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
return;
}
if (msg instanceof HttpContent) {
if (!(msg instanceof EmptyLastHttpContent)) {
if (!(msg instanceof EmptyLastHttpContent emptyLastHttpContent)) {
super.onInboundNext(ctx, msg);
}
else {
emptyLastHttpContent.close();
}
if (msg instanceof LastHttpContent) {
//force auto read to enable more accurate close selection now inbound is done
channel().setOption(ChannelOption.AUTO_READ, true);
Expand Down
Expand Up @@ -37,8 +37,7 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
return super.write(ctx, msg);
}

@Override
protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
void decode(ChannelHandlerContext ctx, HttpRequest msg, boolean release) throws Exception {
HttpRequest request = msg;
if (msg instanceof FullHttpRequest fullHttpRequest &&
(!fullHttpRequest.isAccessible() || fullHttpRequest.payload().readableBytes() == 0)) {
Expand All @@ -48,7 +47,15 @@ protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws
// 2. fireChannelRead(...) is invoked at the end of super.decodeAndClose(...) which will end up
// in io.netty5.channel.DefaultChannelPipeline.onUnhandledInboundMessage which closes the msg.
request = new DefaultHttpRequest(msg.protocolVersion(), msg.method(), msg.uri(), msg.headers());
if (release && fullHttpRequest.isAccessible()) {
fullHttpRequest.close();
}
}
super.decodeAndClose(ctx, request);
}

@Override
protected void decodeAndClose(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
decode(ctx, msg, true);
}
}
Expand Up @@ -25,7 +25,7 @@
import io.netty5.handler.codec.http.EmptyLastHttpContent;
import io.netty5.handler.codec.http.HttpHeaderNames;
import io.netty5.handler.codec.http.headers.HttpHeaders;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.FullHttpRequest;
import io.netty5.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty5.handler.codec.http.websocketx.PongWebSocketFrame;
Expand Down Expand Up @@ -85,7 +85,7 @@ final class WebsocketServerOperations extends HttpServerOperations
removeHandler(NettyPipeline.AccessLogHandler);
removeHandler(NettyPipeline.HttpMetricsHandler);

HttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(),
FullHttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri(),
channel.bufferAllocator().allocate(0));

request.headers()
Expand Down Expand Up @@ -115,6 +115,7 @@ final class WebsocketServerOperations extends HttpServerOperations
request,
responseHeaders)
.addListener(f -> {
request.close();
if (replaced.rebind(this)) {
markPersistent(false);
// This change is needed after the Netty change https://github.com/netty/netty/pull/11966
Expand Down
Expand Up @@ -62,6 +62,13 @@
"name": "reactor.netty5.http.client.MicrometerHttpClientMetricsHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler"
},
"name": "reactor.netty5.http.client.HttpClientConfig$ReactorNettyHttpClientUpgradeHandler",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty5.http.server.AbstractHttpServerMetricsHandler"
Expand Down
Expand Up @@ -165,7 +165,10 @@ private void doTestStatus(HttpResponseStatus status) {
EmbeddedChannel channel = new EmbeddedChannel();
HttpClientOperations ops = new HttpClientOperations(() -> channel,
ConnectionObserver.emptyListener());
ops.setNettyResponse(new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0)));
assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase());
try (DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HTTP_1_1, status, channel.bufferAllocator().allocate(0))) {
ops.setNettyResponse(response);
assertThat(ops.status().reasonPhrase()).isEqualTo(status.reasonPhrase());
}
}
}
Expand Up @@ -1850,8 +1850,10 @@ private void doTestStatus(HttpResponseStatus status) {
null,
false);
ops.status(status);
HttpMessage response = ops.newFullBodyMessage(channel.bufferAllocator().allocate(0));
assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase());
try (Buffer buffer = channel.bufferAllocator().allocate(0)) {
HttpMessage response = ops.newFullBodyMessage(buffer);
assertThat(((FullHttpResponse) response).status().reasonPhrase()).isEqualTo(status.reasonPhrase());
}
channel.close();
}

Expand Down
Expand Up @@ -59,6 +59,8 @@ void responseNonChunked() {
channel.writeOutbound(newHttpResponse(false));

channel.writeOutbound(new DefaultLastHttpContent(channel.bufferAllocator().allocate(0)));

assertThat(channel.finishAndReleaseAll()).isTrue();
}

@Test
Expand Down