Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HttpMetricsHandlerTests testServerConnectionsMicrometer still unstable #2421

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -69,7 +75,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -115,6 +123,15 @@ class HttpMetricsHandlerTests extends BaseHttpTest {
static Http11SslContextSpec clientCtx11;
static Http2SslContextSpec clientCtx2;

private ChannelGroup group;
private static final EventExecutor executor = new DefaultEventExecutor();

@AfterAll
public static void afterClass() throws Exception {
executor.shutdownGracefully()
.get(5, TimeUnit.SECONDS);
}

@BeforeAll
static void createSelfSignedCertificate() throws CertificateException {
Assertions.setMaxStackTraceElementsDisplayed(100);
Expand Down Expand Up @@ -143,7 +160,11 @@ static void createSelfSignedCertificate() throws CertificateException {
*/
@BeforeEach
void setUp() {
group = new DefaultChannelGroup(executor);
httpServer = createServer()
// Register a channel group, when invoking disposeNow()
// it will close all remaining client sockets on the server, if any.
.channelGroup(group)
.host("127.0.0.1")
.metrics(true, Function.identity())
.httpRequestDecoder(spec -> spec.h2cMaxContentLength(256))
Expand All @@ -167,10 +188,23 @@ void setUp() {
}

@AfterEach
void tearDown() {
void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
provider.disposeLater()
.block(Duration.ofSeconds(30));

// In case the ServerCloseHandler is registered on the server, make sure client socket is closed on the server side
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

if (disposableServer != null) {
disposableServer.disposeNow();
disposableServer = null; // avoid to dispose the server again from the BaseHttpTest.disposeServer method
}

if (group != null) {
group.close()
.get(5, TimeUnit.SECONDS);
}

Metrics.removeRegistry(registry);
registry.clear();
registry.close();
Expand Down Expand Up @@ -585,16 +619,14 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc
void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server
CountDownLatch serverClosed = new CountDownLatch(1); // socket closed on the server side
ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE;
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
HttpServer server = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.metrics(true, Function.identity())
.doOnConnection(cnx -> responseSentHandler.register(responseSent, cnx.channel().pipeline()));
.doOnConnection(cnx -> {
ResponseSentHandler.INSTANCE.register(responseSent, cnx.channel().pipeline());
ServerCloseHandler.INSTANCE.register(cnx.channel());
});

server = isHttp11 ?
server.doOnChannelInit((cnxObs, ch, sockAddr) -> serverCloseHandler.register(ch, serverClosed, isHttp11)) : server;
disposableServer = server.bindNow();

AtomicReference<SocketAddress> clientAddress = new AtomicReference<>();
Expand Down Expand Up @@ -622,22 +654,22 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco

// now check the server counters
if (isHttp11) {
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();
// make sure the client socket is closed on the server side before checking server metrics
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(SERVER_CONNECTIONS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
}
else {
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(SERVER_STREAMS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}

// These metrics are meant only for the servers,
// connections metrics for the clients are available from the connection pool
address = formatSocketAddress(clientAddress.get());
checkGauge(CLIENT_CONNECTIONS_TOTAL, false, 0, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(CLIENT_CONNECTIONS_ACTIVE, false, 0, URI, HTTP, LOCAL_ADDRESS, address);

disposableServer.disposeNow();
}

@ParameterizedTest
Expand All @@ -648,11 +680,9 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[
// ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state
ServerRecorder.INSTANCE.reset();
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
CountDownLatch serverClosed = new CountDownLatch(1);
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.doOnConnection(c -> serverCloseHandler.register(c.channel(), serverClosed, isHttp11))
.doOnConnection(cnx -> ServerCloseHandler.INSTANCE.register(cnx.channel()))
.metrics(true, ServerRecorder.supplier(), Function.identity())
.bindNow();
String address = formatSocketAddress(disposableServer.address());
Expand All @@ -672,26 +702,22 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[
.expectComplete()
.verify(Duration.ofSeconds(30));

// dispose the client connection provider now, before asserting test expectations.
provider.disposeLater()
.block(Duration.ofSeconds(30));

// now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();

// now we can assert test expectations
assertThat(ServerRecorder.INSTANCE.error.get()).isNull();

if (isHttp11) {
// wait for the AbstractHttpServerMetricsHandlerServer to be called in recordServerConnectionClosed before asserting test expectations
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
}
else {
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}

disposableServer.disposeNow();
}

@Test
Expand Down Expand Up @@ -723,16 +749,12 @@ void testIssue896() throws Exception {
@MethodSource("http11CompatibleProtocols")
void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch serverClosed = new CountDownLatch(1);
CountDownLatch clientCompleted = new CountDownLatch(1);
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.doOnChannelInit((obs, c, s) -> serverCloseHandler.register(c, serverClosed, isHttp11))
.doOnChannelInit((cobs, ch, addr) -> ServerCloseHandler.INSTANCE.register(ch))
.httpRequestDecoder(spec -> spec.maxHeaderSize(32))
.bindNow();

CountDownLatch clientCompleted = new CountDownLatch(1);
AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols)
.doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
Expand All @@ -746,12 +768,8 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco
.expectComplete()
.verify(Duration.ofSeconds(30));

// dispose the client connection provider now, before asserting test expectations.
provider.disposeLater()
.block(Duration.ofSeconds(30));

// now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();
// Ensure client socket is closed on the server, to make sure that server metrics are up-to-date.
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

// Ensure client has fully received the response before asserting test expectations
assertThat(clientCompleted.await(30, TimeUnit.SECONDS)).as("clientCompleted latch await").isTrue();
Expand Down Expand Up @@ -1121,7 +1139,6 @@ static final class ServerRecorder implements HttpServerMetricsRecorder {
private final AtomicReference<String> onActiveConnectionsLocalAddr = new AtomicReference<>();
private final AtomicReference<String> onInactiveConnectionsLocalAddr = new AtomicReference<>();
private final AtomicInteger onActiveConnectionsAmount = new AtomicInteger();
private volatile CountDownLatch done = new CountDownLatch(4);

static Supplier<ServerRecorder> supplier() {
return SUPPLIER;
Expand All @@ -1134,35 +1151,30 @@ void reset() {
onActiveConnectionsLocalAddr.set(null);
onInactiveConnectionsLocalAddr.set(null);
onActiveConnectionsAmount.set(0);
done = new CountDownLatch(4);
}

@Override
public void recordServerConnectionOpened(SocketAddress localAddress) {
onServerConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onServerConnectionsAmount.addAndGet(1);
done.countDown();
}

@Override
public void recordServerConnectionClosed(SocketAddress localAddress) {
onServerConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onServerConnectionsAmount.addAndGet(-1);
done.countDown();
}

@Override
public void recordServerConnectionActive(SocketAddress localAddress) {
onActiveConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onActiveConnectionsAmount.addAndGet(1);
done.countDown();
}

@Override
public void recordServerConnectionInactive(SocketAddress localAddress) {
onInactiveConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onActiveConnectionsAmount.addAndGet(-1);
done.countDown();
}

@Override
Expand Down Expand Up @@ -1287,16 +1299,19 @@ static final class ServerCloseHandler extends ChannelInboundHandlerAdapter {
static final ServerCloseHandler INSTANCE = new ServerCloseHandler();
static final String HANDLER_NAME = "ServerCloseHandler.handler";
private CountDownLatch latch;

void register(Channel channel, CountDownLatch latch, boolean http11) {
this.latch = latch;

if (http11) {
channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, HANDLER_NAME, this);
private boolean registered;

void register(Channel channel) {
this.latch = new CountDownLatch(1);
if (channel instanceof Http2StreamChannel) {
if (channel.parent().pipeline().get(HANDLER_NAME) == null) {
channel.parent().pipeline().addLast(HANDLER_NAME, this);
}
}
else {
channel.parent().pipeline().addLast(HANDLER_NAME, this);
channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, HANDLER_NAME, this);
}
registered = true;
}

@Override
Expand All @@ -1309,6 +1324,18 @@ public void channelInactive(ChannelHandlerContext ctx) {
public boolean isSharable() {
return true;
}

public boolean awaitClientClosedOnServer() throws InterruptedException {
if (registered) {
try {
return latch.await(30, TimeUnit.SECONDS);
}
finally {
registered = false;
}
}
return true;
}
}

/**
Expand Down