diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java index f58182c6d0..1ca6053ae1 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java @@ -25,14 +25,17 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.tck.MeterRegistryAssert; import io.netty5.buffer.api.Buffer; +import io.netty5.channel.Channel; import io.netty5.channel.ChannelHandlerAdapter; import io.netty5.channel.ChannelHandlerContext; +import io.netty5.channel.ChannelPipeline; import io.netty5.handler.codec.http.LastHttpContent; import io.netty5.handler.codec.http2.HttpConversionUtil; import io.netty5.handler.ssl.SslProvider; import io.netty5.handler.ssl.util.InsecureTrustManagerFactory; import io.netty5.handler.ssl.util.SelfSignedCertificate; import io.netty5.util.concurrent.Future; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -45,7 +48,7 @@ import reactor.core.publisher.Mono; import reactor.netty5.BaseHttpTest; import reactor.netty5.BufferFlux; -import reactor.netty5.ConnectionObserver; +import reactor.netty5.NettyPipeline; import reactor.netty5.http.client.ContextAwareHttpClientMetricsRecorder; import reactor.netty5.http.client.HttpClient; import reactor.netty5.http.server.ContextAwareHttpServerMetricsRecorder; @@ -71,6 +74,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -113,6 +117,7 @@ class HttpMetricsHandlerTests extends BaseHttpTest { @BeforeAll static void createSelfSignedCertificate() throws CertificateException { + Assertions.setMaxStackTraceElementsDisplayed(100); ssc = new SelfSignedCertificate(); serverCtx11 = Http11SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()) .configure(builder -> builder.sslProvider(SslProvider.JDK)); @@ -174,20 +179,20 @@ void tearDown() { @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedDisconnects = getExpectedDisconnects(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - AtomicReference latchRef = new AtomicReference<>(latch1); - ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) - .childObserve(observerDisconnect) + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server + AtomicReference responseSentRef = new AtomicReference<>(responseSent); + ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE; + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + .doOnConnection(cnx -> responseSentHandler.register(responseSentRef, cnx.channel().pipeline())) .bindNow(); AtomicReference serverAddress = new AtomicReference<>(); - httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) -> - serverAddress.set(conn.channel().remoteAddress()) - ).observe(observerDisconnect); + CountDownLatch clientCompleted = new CountDownLatch(1); // client received full response + AtomicReference clientCompletedRef = new AtomicReference<>(clientCompleted); + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .doAfterResponseSuccess((resp, conn) -> clientCompletedRef.get().countDown()); StepVerifier.create(httpClient.post() .uri("/1") @@ -199,7 +204,8 @@ void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientP .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(responseSentRef.get().await(30, TimeUnit.SECONDS)).as("responseSentRef latch await").isTrue(); + assertThat(clientCompletedRef.get().await(30, TimeUnit.SECONDS)).as("clientCompletedRef latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -220,8 +226,8 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("/1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - latchRef.set(latch2); + responseSentRef.set(new CountDownLatch(1)); + clientCompletedRef.set(new CountDownLatch(1)); StepVerifier.create(httpClient.post() .uri("/2?i=1&j=2") @@ -233,7 +239,9 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(responseSentRef.get().await(30, TimeUnit.SECONDS)).as("responseSentRef latch await").isTrue(); + assertThat(clientCompletedRef.get().await(30, TimeUnit.SECONDS)).as("clientCompletedRef latch await").isTrue(); + sa = (InetSocketAddress) serverAddress.get(); checkExpectationsExisting("/2", sa.getHostString() + ":" + sa.getPort(), connIndex, serverCtx != null, @@ -244,8 +252,7 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .metrics(true, id -> { throw new IllegalArgumentException("Testcase injected Exception"); @@ -269,8 +276,7 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .bindNow(); @@ -292,21 +298,29 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedDisconnects = getExpectedDisconnects(negotiatedProtocol); - - CountDownLatch latch = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - AtomicReference latchRef = new AtomicReference<>(latch); - ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) - .childObserve(observerDisconnect) + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server + AtomicReference responseSentRef = new AtomicReference<>(responseSent); + ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE; + CountDownLatch requestReceived = new CountDownLatch(1); // request fully received by the server + AtomicReference requestReceivedRef = new AtomicReference<>(requestReceived); + RequestReceivedHandler requestReceivedHandler = RequestReceivedHandler.INSTANCE; + + // the requestReceivedHandler is used to detect when the server has received the last client request content + // the responseSentHandler is used to detect when the server has sent the last response content + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + .doOnConnection(cnx -> { + responseSentHandler.register(responseSentRef, cnx.channel().pipeline()); + requestReceivedHandler.register(requestReceivedRef, cnx.channel().pipeline()); + }) .bindNow(); AtomicReference serverAddress = new AtomicReference<>(); - httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) -> - serverAddress.set(conn.channel().remoteAddress()) - ).observe(observerDisconnect); + CountDownLatch clientCompleted = new CountDownLatch(1); + AtomicReference clientCompletedRef = new AtomicReference<>(clientCompleted); + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .doAfterResponseSuccess((rsp, conn) -> clientCompletedRef.get().countDown()); StepVerifier.create(httpClient .headers(h -> h.add("Connection", "close")) @@ -318,7 +332,9 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(requestReceivedRef.get().await(30, TimeUnit.SECONDS)).as("requestReceivedRef latch await").isTrue(); + assertThat(responseSentRef.get().await(30, TimeUnit.SECONDS)).as("responseSentRef latch await").isTrue(); + assertThat(clientCompletedRef.get().await(30, TimeUnit.SECONDS)).as("clientCompletedRef latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -348,8 +364,9 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), 1, 1, serverCtx != null, numWrites[0], numReads[0], bytesWrite[0], bytesRead[0]); - CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - latchRef.set(latch2); + requestReceivedRef.set(new CountDownLatch(1)); + responseSentRef.set(new CountDownLatch(1)); + clientCompletedRef.set(new CountDownLatch(1)); StepVerifier.create(httpClient .headers(h -> h.add("Connection", "close")) @@ -361,7 +378,9 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(requestReceivedRef.get().await(30, TimeUnit.SECONDS)).as("requestReceivedRef latch await").isTrue(); + assertThat(responseSentRef.get().await(30, TimeUnit.SECONDS)).as("responseSentRef latch await").isTrue(); + assertThat(clientCompletedRef.get().await(30, TimeUnit.SECONDS)).as("clientCompletedRef latch await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), connIndex, 2, serverCtx != null, @@ -371,21 +390,20 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedDisconnects = getExpectedDisconnects(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - AtomicReference latchRef = new AtomicReference<>(latch1); - ConnectionObserver observerDisconnect = observeDisconnect(latchRef); + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server + CountDownLatch clientCompleted = new CountDownLatch(1); // client received full response + ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE; - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + .doOnConnection(cnx -> responseSentHandler.register(responseSent, cnx.channel().pipeline())) .metrics(true, s -> "testUriTagValueResolver") - .childObserve(observerDisconnect) .bindNow(); AtomicReference serverAddress = new AtomicReference<>(); - httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) -> - serverAddress.set(conn.channel().remoteAddress()) - ).observe(observerDisconnect); + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols) + .doAfterResponseSuccess((res, conn) -> clientCompleted.countDown()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())); StepVerifier.create(httpClient.metrics(true, s -> "testUriTagValueResolver") .post() @@ -398,7 +416,8 @@ void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(responseSent.await(30, TimeUnit.SECONDS)).as("responseSent latch await").isTrue(); + assertThat(clientCompleted.await(30, TimeUnit.SECONDS)).as("clientCompleted latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -423,29 +442,30 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedDisconnects = getExpectedDisconnects(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - AtomicReference latchRef = new AtomicReference<>(latch1); - ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server + AtomicReference responseSentRef = new AtomicReference<>(responseSent); + ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE; disposableServer = - customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true).metrics(true, - s -> { - if ("/1".equals(s)) { - return "testUriTagValueFunctionNotShared_1"; - } - else { - return "testUriTagValueFunctionNotShared_2"; - } - }) - .childObserve(observerDisconnect) + customizeServerOptions(httpServer, serverCtx, serverProtocols) + .doOnConnection(cnx -> responseSentHandler.register(responseSentRef, cnx.channel().pipeline())) + .metrics(true, + s -> { + if ("/1".equals(s)) { + return "testUriTagValueFunctionNotShared_1"; + } + else { + return "testUriTagValueFunctionNotShared_2"; + } + }) .bindNow(); + CountDownLatch clientCompleted = new CountDownLatch(1); // client received full response + AtomicReference clientCompletedRef = new AtomicReference<>(clientCompleted); AtomicReference serverAddress = new AtomicReference<>(); - httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) -> - serverAddress.set(conn.channel().remoteAddress()) - ).observe(observerDisconnect); + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .doAfterResponseSuccess((resp, conn) -> clientCompletedRef.get().countDown()); httpClient.metrics(true, s -> "testUriTagValueFunctionNotShared_1") .post() @@ -459,7 +479,8 @@ void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, H .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(responseSentRef.get().await(30, TimeUnit.SECONDS)).as("responseSentRef latch await").isTrue(); + assertThat(clientCompletedRef.get().await(30, TimeUnit.SECONDS)).as("clientCompletedRef latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -477,8 +498,8 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("testUriTagValueFunctionNotShared_1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - latchRef.set(latch2); + responseSentRef.set(new CountDownLatch(1)); + clientCompletedRef.set(new CountDownLatch(1)); httpClient.metrics(true, s -> "testUriTagValueFunctionNotShared_2") .post() @@ -492,7 +513,8 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(responseSentRef.get().await(30, TimeUnit.SECONDS)).as("responseSentRef latch await").isTrue(); + assertThat(clientCompletedRef.get().await(30, TimeUnit.SECONDS)).as("clientCompletedRef await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); @@ -503,17 +525,13 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).bindNow(); ClientContextAwareRecorder recorder = ClientContextAwareRecorder.INSTANCE; - CountDownLatch latch = new CountDownLatch(1); httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols); - httpClient.doOnResponse((res, conn) -> conn.channel() - .closeFuture() - .addListener(f -> latch.countDown())) - .metrics(true, () -> recorder) + + httpClient.metrics(true, () -> recorder) .post() .uri("/1") .send(body) @@ -526,8 +544,6 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); } @@ -535,20 +551,19 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server ServerContextAwareRecorder recorder = ServerContextAwareRecorder.INSTANCE; + ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE; disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, () -> recorder) - .mapHandle((mono, conn) -> mono.contextWrite(Context.of("testContextAwareRecorder", "OK"))) - .bindNow(); + .doOnConnection(cnx -> responseSentHandler.register(responseSent, cnx.channel().pipeline())) + .mapHandle((mono, conn) -> mono.contextWrite(Context.of("testContextAwareRecorder", "OK"))) + .bindNow(); - CountDownLatch latch = new CountDownLatch(1); httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols); - httpClient.doOnResponse((res, conn) -> conn.channel() - .closeFuture() - .addListener(f -> latch.countDown())) - .post() + + httpClient.post() .uri("/1") .send(body) .responseContent() @@ -559,7 +574,7 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(responseSent.await(30, TimeUnit.SECONDS)).as("responseSent latch await").isTrue(); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -568,22 +583,23 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedDisconnects = getExpectedDisconnects(negotiatedProtocol); - CountDownLatch latch = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); - AtomicReference latchRef = new AtomicReference<>(latch); - ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server + CountDownLatch serverClosed = new CountDownLatch(1); // socket closed on the server side + ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE; + ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE; boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11; - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) + HttpServer server = customizeServerOptions(httpServer, serverCtx, serverProtocols) .metrics(true, Function.identity()) - .childObserve(observerDisconnect) - .bindNow(); + .doOnConnection(cnx -> responseSentHandler.register(responseSent, cnx.channel().pipeline())); + + server = isHttp11 ? + server.doOnChannelInit((cnxObs, ch, sockAddr) -> serverCloseHandler.register(ch, serverClosed, isHttp11)) : server; + disposableServer = server.bindNow(); AtomicReference clientAddress = new AtomicReference<>(); - httpClient = httpClient.doAfterRequest((req, conn) -> - clientAddress.set(conn.channel().localAddress()) - ).observe(observerDisconnect); + httpClient = httpClient + .doAfterRequest((req, conn) -> clientAddress.set(conn.channel().localAddress())); String uri = "/4"; String address = formatSocketAddress(disposableServer.address()); @@ -602,10 +618,11 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + assertThat(responseSent.await(30, TimeUnit.SECONDS)).as("responseSent latch await").isTrue(); // now check the server counters if (isHttp11) { + assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue(); checkGauge(SERVER_CONNECTIONS_TOTAL, true, 0, URI, HTTP, LOCAL_ADDRESS, address); checkGauge(SERVER_CONNECTIONS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address); } @@ -626,28 +643,23 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { // Invoke ServerRecorder.INSTANCE.reset() here as disposableServer.dispose (AfterEach) might be invoked after // ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state ServerRecorder.INSTANCE.reset(); boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11; + CountDownLatch serverClosed = new CountDownLatch(1); + ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE; + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) - .metrics(true, () -> { - ServerRecorder.INSTANCE.done = isHttp11 ? new CountDownLatch(4) : new CountDownLatch(1); - return ServerRecorder.INSTANCE; - }, - Function.identity()) + .doOnConnection(c -> serverCloseHandler.register(c.channel(), serverClosed, isHttp11)) + .metrics(true, ServerRecorder.supplier(), Function.identity()) .bindNow(); String address = formatSocketAddress(disposableServer.address()); - CountDownLatch latch = new CountDownLatch(1); - httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols); - httpClient.doOnResponse((res, conn) -> - conn.channel() - .closeFuture() - .addListener(f -> latch.countDown())) + + httpClient .metrics(true, Function.identity()) .post() .uri("/5") @@ -660,8 +672,15 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(ServerRecorder.INSTANCE.done.await(30, TimeUnit.SECONDS)).as("recorder latch await").isTrue(); + // dispose the client connection provider now, before asserting test expectations. + provider.disposeLater() + .block(Duration.ofSeconds(30)); + + // now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations + assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue(); + + // now we can assert test expectations + assertThat(ServerRecorder.INSTANCE.error.get()).isNull(); if (isHttp11) { assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0); @@ -669,7 +688,7 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address); } else { - assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1); + assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); } disposableServer.disposeNow(); @@ -680,12 +699,10 @@ void testIssue896() throws Exception { disposableServer = httpServer.noSSL() .bindNow(); - // the client will observe three DISCONNECT: one when a NotSSLRecordException is caught, - // one when DecoderException is caught, and one when the connection becomes inactive - CountDownLatch latch = new CountDownLatch(3); - AtomicReference latchRef = new AtomicReference<>(latch); + // The client should get two errors: NotSSLRecordException, and DecoderException. + CountDownLatch latch = new CountDownLatch(2); httpClient - .observe(observeDisconnect(latchRef)) + .doOnChannelInit((o, c, address) -> ClientExceptionHandler.INSTANCE.register(c, latch)) .secure(spec -> spec.sslContext(clientCtx11)) .post() .uri("/1") @@ -706,21 +723,20 @@ void testIssue896() throws Exception { @MethodSource("http11CompatibleProtocols") void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - CountDownLatch latch = new CountDownLatch(5); // expect to observe 2 server disconnect + 2 client disconnect events + 1 event when request is fully handled - AtomicReference latchRef = new AtomicReference<>(latch); - ConnectionObserver observerDisconnect = observeDisconnect(latchRef); + CountDownLatch serverClosed = new CountDownLatch(1); + CountDownLatch clientCompleted = new CountDownLatch(1); + boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11; + ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE; - // we need to register our latch handler using doOnChannelInit because here, the HttpTrafficHandler will return an http response error - // without ever calling doOnConnection callback (see for example HttpTrafficHandler.channelRead method). - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, false) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + .doOnChannelInit((obs, c, s) -> serverCloseHandler.register(c, serverClosed, isHttp11)) .httpRequestDecoder(spec -> spec.maxHeaderSize(32)) - .childObserve(observerDisconnect) .bindNow(); AtomicReference serverAddress = new AtomicReference<>(); httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols) .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) - .observe(observerDisconnect); + .doAfterResponseSuccess((resp, conn) -> clientCompleted.countDown()); httpClient.get() .uri("/max_header_size") @@ -730,21 +746,20 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + // dispose the client connection provider now, before asserting test expectations. + provider.disposeLater() + .block(Duration.ofSeconds(30)); + // now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations + assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue(); + + // Ensure client has fully received the response before asserting test expectations + assertThat(clientCompleted.await(30, TimeUnit.SECONDS)).as("clientCompleted latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); checkExpectationsBadRequest(sa.getHostString() + ":" + sa.getPort(), serverCtx != null); } - private ConnectionObserver observeDisconnect(AtomicReference latchRef) { - return (connection, state) -> { - if (state == ConnectionObserver.State.DISCONNECTING) { - latchRef.get().countDown(); - } - }; - } - private void checkServerConnectionsMicrometer(HttpServerRequest request) { String address = formatSocketAddress(request.hostAddress()); boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text()); @@ -758,15 +773,20 @@ private void checkServerConnectionsMicrometer(HttpServerRequest request) { } private void checkServerConnectionsRecorder(HttpServerRequest request) { - String address = formatSocketAddress(request.hostAddress()); - boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text()); - assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1); - assertThat(ServerRecorder.INSTANCE.onServerConnectionsLocalAddr.get()).isEqualTo(address); - if (!isHttp2) { - assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(1); - assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address); + try { + String address = formatSocketAddress(request.hostAddress()); + boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text()); + assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1); + assertThat(ServerRecorder.INSTANCE.onServerConnectionsLocalAddr.get()).isEqualTo(address); + if (!isHttp2) { + assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(1); + assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address); + } + assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isNull(); + } + catch (Throwable error) { + ServerRecorder.INSTANCE.error.set(error); } - assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isNull(); } private void checkExpectationsExisting(String uri, String serverAddress, int connIndex, boolean checkTls, @@ -839,7 +859,6 @@ private void checkExpectationsNonExisting(String serverAddress, int connIndex, i checkCounter(CLIENT_ERRORS, summaryTags2, false, 0); } - private void checkExpectationsBadRequest(String serverAddress, boolean checkTls) { String uri = "/max_header_size"; String[] timerTags1 = new String[] {URI, uri, METHOD, "GET", STATUS, "413"}; @@ -870,21 +889,7 @@ private void checkExpectationsBadRequest(String serverAddress, boolean checkTls) } HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) { - return customizeServerOptions(httpServer, ctx, protocols, null, false); - } - - HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols, - @Nullable AtomicReference latchRef, boolean registerLatchHandlerOnConnection) { - HttpServer server = ctx == null ? httpServer.protocol(protocols) : httpServer.protocol(protocols).secure(spec -> spec.sslContext(ctx)); - if (latchRef != null) { - RequestCompletedHandler handler = RequestCompletedHandler.INSTANCE; - handler.reset(latchRef); - server = registerLatchHandlerOnConnection ? - server.doOnConnection(connection -> connection.channel().pipeline().addLast(handler)) : - server.doOnChannelInit((connectionObserver, channel, socketAddress) -> channel.pipeline().addLast(handler)); - } - - return server; + return ctx == null ? httpServer.protocol(protocols) : httpServer.protocol(protocols).secure(spec -> spec.sslContext(ctx)); } HttpClient customizeClientOptions(HttpClient httpClient, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) { @@ -933,19 +938,6 @@ void checkGauge(String name, boolean exists, double expectedCount, String... tag } } - /** - * Get number of disconnect events we expect to observe on a given connection. - * @param protocol the protocol used (for HTTP11, we expect to observe 4 disconnect events, and for other (H2/H2C), we expect 3 events)). - * @return number of disconnect events we expect to observe on a given connection - */ - @SuppressWarnings("UnnecessaryParentheses") - int getExpectedDisconnects(HttpProtocol protocol) { - return switch (protocol) { - case H2, H2C -> 3; - case HTTP11 -> 4; - }; - } - static Stream http11CompatibleProtocols() { return Stream.of( Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), @@ -959,23 +951,23 @@ static Stream http11CompatibleProtocols() { static Stream httpCompatibleProtocols() { return Stream.of( - Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11), + Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, - Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11), HttpProtocol.HTTP11), + Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11)), Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11), HttpProtocol.HTTP11), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11)), Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null, HttpProtocol.H2C), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null, HttpProtocol.H2C), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null, HttpProtocol.H2C) + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null) ); } @@ -1124,6 +1116,8 @@ public void recordResolveAddressTime(SocketAddress socketAddress, Duration durat static final class ServerRecorder implements HttpServerMetricsRecorder { static final ServerRecorder INSTANCE = new ServerRecorder(); + static final Supplier SUPPLIER = () -> INSTANCE; + private final AtomicReference error = new AtomicReference<>(); private final AtomicInteger onServerConnectionsAmount = new AtomicInteger(); private final AtomicReference onServerConnectionsLocalAddr = new AtomicReference<>(); private final AtomicReference onActiveConnectionsLocalAddr = new AtomicReference<>(); @@ -1131,7 +1125,12 @@ static final class ServerRecorder implements HttpServerMetricsRecorder { private final AtomicInteger onActiveConnectionsAmount = new AtomicInteger(); private volatile CountDownLatch done = new CountDownLatch(4); + static Supplier supplier() { + return SUPPLIER; + } + void reset() { + error.set(null); onServerConnectionsAmount.set(0); onServerConnectionsLocalAddr.set(null); onActiveConnectionsLocalAddr.set(null); @@ -1218,29 +1217,120 @@ public void recordResolveAddressTime(SocketAddress socketAddress, Duration durat } /** - * Handler used to ensure that the request has completed on the server. + * Server Handler used to detect when the last http response content has been sent to the client. + * Handler placed before the HttpMetricsHandler on the Server pipeline. + * Metrics are up-to-date when the latch is counted down. */ - static final class RequestCompletedHandler extends ChannelHandlerAdapter { - static final RequestCompletedHandler INSTANCE = new RequestCompletedHandler(); - private AtomicReference latchRef = new AtomicReference<>(null); + static final class ResponseSentHandler extends ChannelHandlerAdapter { + final static String HANDLER_NAME = "ServerCompletedHandler.handler"; + final static ResponseSentHandler INSTANCE = new ResponseSentHandler(); + AtomicReference latchRef; - void reset(AtomicReference latchRef) { + void register(AtomicReference latchRef, ChannelPipeline pipeline) { this.latchRef = latchRef; + pipeline.addAfter(NettyPipeline.HttpMetricsHandler, HANDLER_NAME, this); + } + + void register(CountDownLatch latch, ChannelPipeline pipeline) { + register(new AtomicReference<>(latch), pipeline); } @Override - public Future write(ChannelHandlerContext ctx, Object msg) { + public Future write(ChannelHandlerContext ctx, Object msg) { if (msg instanceof LastHttpContent) { return ctx.write(msg).addListener(future -> latchRef.get().countDown()); + + } + + return ctx.write(msg); + } + + @Override + public boolean isSharable() { + return true; + } + } + + /** + * Server Handler used to detect when the last http client request content has been received by the server. + * Handler placed after the HttpMetricsHandler on the Server pipeline. + * Metrics are up-to-date when the latch is counted down. + */ + static final class RequestReceivedHandler extends ChannelHandlerAdapter { + final static RequestReceivedHandler INSTANCE = new RequestReceivedHandler(); + final static String HANDLER_NAME = "ServerReceivedHandler.handler"; + AtomicReference latchRef; + + void register(AtomicReference latchRef, ChannelPipeline pipeline) { + this.latchRef = latchRef; + pipeline.addAfter(NettyPipeline.HttpMetricsHandler, HANDLER_NAME, this); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof LastHttpContent) { + latchRef.get().countDown(); + } + ctx.fireChannelRead(msg); + } + + @Override + public boolean isSharable() { + return true; + } + } + + /** + * Server handler used to wait until the client socket is closed on the server side. + * For HTTP1.1, the handler is placed before the ReactorBridge, so all previous handlers will see + * the close before this handler. For HTTP2, the handler is placed lastly on the pipeline. + */ + static final class ServerCloseHandler extends ChannelHandlerAdapter { + static final ServerCloseHandler INSTANCE = new ServerCloseHandler(); + static final String HANDLER_NAME = "ServerCloseHandler.handler"; + private CountDownLatch latch; + + void register(Channel channel, CountDownLatch latch, boolean http11) { + this.latch = latch; + + if (http11) { + channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, HANDLER_NAME, this); } else { - return ctx.write(msg); + channel.parent().pipeline().addLast(HANDLER_NAME, this); } } + @Override + public void channelInactive(ChannelHandlerContext ctx) { + latch.countDown(); + ctx.fireChannelInactive(); + } + @Override public boolean isSharable() { - return true; // A server may accept multiple connections, hence this handler must be sharable + return true; + } + } + + /** + * Handler used to get notified when an exception occurs on the HttpClientMetricsHandler. This handler is placed + * after the reactor.left.httpMetricsHandler. + */ + static final class ClientExceptionHandler extends ChannelHandlerAdapter { + static final ClientExceptionHandler INSTANCE = new ClientExceptionHandler(); + static final String HANDLER_NAME = "ExceptionHandler.handler"; + private CountDownLatch latch; + + void register(Channel channel, CountDownLatch latch) { + this.latch = latch; + channel.pipeline().addAfter(NettyPipeline.HttpMetricsHandler, HANDLER_NAME, this); + } + + @Override + public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + latch.countDown(); + ctx.fireChannelExceptionCaught(cause); } } }