Skip to content

Commit

Permalink
HttpMetricsHandlerTests testServerConnectionsMicrometer still unstable (
Browse files Browse the repository at this point in the history
#2421)

Fixed a problem in testServerConnectionsMicrometer test, which was the following:
1- testServerConnectionsMicrometer is finishing an HTTP2 test, the tearDown method is closing the provider, but the server has not yet seen the connection close. Even if the server is disposed and does not listen anymore on its server port, it still see the old H2 connection
2- the next testServerConnectionsMicrometer is starting with HTTP1, but the SERVER_CONNECTIONS_TOTAL meter is still set to 1 (because the old server has not yet seen the connection close), so the test fails.
  • Loading branch information
pderop committed Aug 18, 2022
1 parent 8580b38 commit a8b7a7f
Showing 1 changed file with 75 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -69,7 +75,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -115,6 +123,15 @@ class HttpMetricsHandlerTests extends BaseHttpTest {
static Http11SslContextSpec clientCtx11;
static Http2SslContextSpec clientCtx2;

private ChannelGroup group;
private static final EventExecutor executor = new DefaultEventExecutor();

@AfterAll
public static void afterClass() throws Exception {
executor.shutdownGracefully()
.get(5, TimeUnit.SECONDS);
}

@BeforeAll
static void createSelfSignedCertificate() throws CertificateException {
Assertions.setMaxStackTraceElementsDisplayed(100);
Expand Down Expand Up @@ -143,7 +160,11 @@ static void createSelfSignedCertificate() throws CertificateException {
*/
@BeforeEach
void setUp() {
group = new DefaultChannelGroup(executor);
httpServer = createServer()
// Register a channel group, when invoking disposeNow()
// it will close all remaining client sockets on the server, if any.
.channelGroup(group)
.host("127.0.0.1")
.metrics(true, Function.identity())
.httpRequestDecoder(spec -> spec.h2cMaxContentLength(256))
Expand All @@ -167,10 +188,23 @@ void setUp() {
}

@AfterEach
void tearDown() {
void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
provider.disposeLater()
.block(Duration.ofSeconds(30));

// In case the ServerCloseHandler is registered on the server, make sure client socket is closed on the server side
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

if (disposableServer != null) {
disposableServer.disposeNow();
disposableServer = null; // avoid to dispose the server again from the BaseHttpTest.disposeServer method
}

if (group != null) {
group.close()
.get(5, TimeUnit.SECONDS);
}

Metrics.removeRegistry(registry);
registry.clear();
registry.close();
Expand Down Expand Up @@ -585,16 +619,14 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc
void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server
CountDownLatch serverClosed = new CountDownLatch(1); // socket closed on the server side
ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE;
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
HttpServer server = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.metrics(true, Function.identity())
.doOnConnection(cnx -> responseSentHandler.register(responseSent, cnx.channel().pipeline()));
.doOnConnection(cnx -> {
ResponseSentHandler.INSTANCE.register(responseSent, cnx.channel().pipeline());
ServerCloseHandler.INSTANCE.register(cnx.channel());
});

server = isHttp11 ?
server.doOnChannelInit((cnxObs, ch, sockAddr) -> serverCloseHandler.register(ch, serverClosed, isHttp11)) : server;
disposableServer = server.bindNow();

AtomicReference<SocketAddress> clientAddress = new AtomicReference<>();
Expand Down Expand Up @@ -622,22 +654,22 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco

// now check the server counters
if (isHttp11) {
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();
// make sure the client socket is closed on the server side before checking server metrics
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(SERVER_CONNECTIONS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
}
else {
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(SERVER_STREAMS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}

// These metrics are meant only for the servers,
// connections metrics for the clients are available from the connection pool
address = formatSocketAddress(clientAddress.get());
checkGauge(CLIENT_CONNECTIONS_TOTAL, false, 0, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(CLIENT_CONNECTIONS_ACTIVE, false, 0, URI, HTTP, LOCAL_ADDRESS, address);

disposableServer.disposeNow();
}

@ParameterizedTest
Expand All @@ -648,11 +680,9 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[
// ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state
ServerRecorder.INSTANCE.reset();
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
CountDownLatch serverClosed = new CountDownLatch(1);
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.doOnConnection(c -> serverCloseHandler.register(c.channel(), serverClosed, isHttp11))
.doOnConnection(cnx -> ServerCloseHandler.INSTANCE.register(cnx.channel()))
.metrics(true, ServerRecorder.supplier(), Function.identity())
.bindNow();
String address = formatSocketAddress(disposableServer.address());
Expand All @@ -672,26 +702,22 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[
.expectComplete()
.verify(Duration.ofSeconds(30));

// dispose the client connection provider now, before asserting test expectations.
provider.disposeLater()
.block(Duration.ofSeconds(30));

// now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();

// now we can assert test expectations
assertThat(ServerRecorder.INSTANCE.error.get()).isNull();

if (isHttp11) {
// wait for the AbstractHttpServerMetricsHandlerServer to be called in recordServerConnectionClosed before asserting test expectations
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
}
else {
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}

disposableServer.disposeNow();
}

@Test
Expand Down Expand Up @@ -723,16 +749,12 @@ void testIssue896() throws Exception {
@MethodSource("http11CompatibleProtocols")
void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch serverClosed = new CountDownLatch(1);
CountDownLatch clientCompleted = new CountDownLatch(1);
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.doOnChannelInit((obs, c, s) -> serverCloseHandler.register(c, serverClosed, isHttp11))
.doOnChannelInit((cobs, ch, addr) -> ServerCloseHandler.INSTANCE.register(ch))
.httpRequestDecoder(spec -> spec.maxHeaderSize(32))
.bindNow();

CountDownLatch clientCompleted = new CountDownLatch(1);
AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols)
.doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
Expand All @@ -746,12 +768,8 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco
.expectComplete()
.verify(Duration.ofSeconds(30));

// dispose the client connection provider now, before asserting test expectations.
provider.disposeLater()
.block(Duration.ofSeconds(30));

// now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();
// Ensure client socket is closed on the server, to make sure that server metrics are up-to-date.
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

// Ensure client has fully received the response before asserting test expectations
assertThat(clientCompleted.await(30, TimeUnit.SECONDS)).as("clientCompleted latch await").isTrue();
Expand Down Expand Up @@ -1121,7 +1139,6 @@ static final class ServerRecorder implements HttpServerMetricsRecorder {
private final AtomicReference<String> onActiveConnectionsLocalAddr = new AtomicReference<>();
private final AtomicReference<String> onInactiveConnectionsLocalAddr = new AtomicReference<>();
private final AtomicInteger onActiveConnectionsAmount = new AtomicInteger();
private volatile CountDownLatch done = new CountDownLatch(4);

static Supplier<ServerRecorder> supplier() {
return SUPPLIER;
Expand All @@ -1134,35 +1151,30 @@ void reset() {
onActiveConnectionsLocalAddr.set(null);
onInactiveConnectionsLocalAddr.set(null);
onActiveConnectionsAmount.set(0);
done = new CountDownLatch(4);
}

@Override
public void recordServerConnectionOpened(SocketAddress localAddress) {
onServerConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onServerConnectionsAmount.addAndGet(1);
done.countDown();
}

@Override
public void recordServerConnectionClosed(SocketAddress localAddress) {
onServerConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onServerConnectionsAmount.addAndGet(-1);
done.countDown();
}

@Override
public void recordServerConnectionActive(SocketAddress localAddress) {
onActiveConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onActiveConnectionsAmount.addAndGet(1);
done.countDown();
}

@Override
public void recordServerConnectionInactive(SocketAddress localAddress) {
onInactiveConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onActiveConnectionsAmount.addAndGet(-1);
done.countDown();
}

@Override
Expand Down Expand Up @@ -1287,16 +1299,19 @@ static final class ServerCloseHandler extends ChannelInboundHandlerAdapter {
static final ServerCloseHandler INSTANCE = new ServerCloseHandler();
static final String HANDLER_NAME = "ServerCloseHandler.handler";
private CountDownLatch latch;

void register(Channel channel, CountDownLatch latch, boolean http11) {
this.latch = latch;

if (http11) {
channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, HANDLER_NAME, this);
private boolean registered;

void register(Channel channel) {
this.latch = new CountDownLatch(1);
if (channel instanceof Http2StreamChannel) {
if (channel.parent().pipeline().get(HANDLER_NAME) == null) {
channel.parent().pipeline().addLast(HANDLER_NAME, this);
}
}
else {
channel.parent().pipeline().addLast(HANDLER_NAME, this);
channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, HANDLER_NAME, this);
}
registered = true;
}

@Override
Expand All @@ -1309,6 +1324,18 @@ public void channelInactive(ChannelHandlerContext ctx) {
public boolean isSharable() {
return true;
}

public boolean awaitClientClosedOnServer() throws InterruptedException {
if (registered) {
try {
return latch.await(30, TimeUnit.SECONDS);
}
finally {
registered = false;
}
}
return true;
}
}

/**
Expand Down

0 comments on commit a8b7a7f

Please sign in to comment.