Skip to content

Commit

Permalink
Merge #2421 into 1.1.0-M6
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Aug 18, 2022
2 parents 0497c91 + a8b7a7f commit fe86e6b
Showing 1 changed file with 75 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -71,7 +77,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -117,6 +125,15 @@ class HttpMetricsHandlerTests extends BaseHttpTest {
static Http11SslContextSpec clientCtx11;
static Http2SslContextSpec clientCtx2;

private ChannelGroup group;
private static final EventExecutor executor = new DefaultEventExecutor();

@AfterAll
public static void afterClass() throws Exception {
executor.shutdownGracefully()
.get(5, TimeUnit.SECONDS);
}

@BeforeAll
static void createSelfSignedCertificate() throws CertificateException {
Assertions.setMaxStackTraceElementsDisplayed(100);
Expand Down Expand Up @@ -145,7 +162,11 @@ static void createSelfSignedCertificate() throws CertificateException {
*/
@BeforeEach
void setUp() {
group = new DefaultChannelGroup(executor);
httpServer = createServer()
// Register a channel group, when invoking disposeNow()
// it will close all remaining client sockets on the server, if any.
.channelGroup(group)
.host("127.0.0.1")
.metrics(true, Function.identity())
.httpRequestDecoder(spec -> spec.h2cMaxContentLength(256))
Expand All @@ -169,10 +190,23 @@ void setUp() {
}

@AfterEach
void tearDown() {
void tearDown() throws InterruptedException, ExecutionException, TimeoutException {
provider.disposeLater()
.block(Duration.ofSeconds(30));

// In case the ServerCloseHandler is registered on the server, make sure client socket is closed on the server side
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

if (disposableServer != null) {
disposableServer.disposeNow();
disposableServer = null; // avoid to dispose the server again from the BaseHttpTest.disposeServer method
}

if (group != null) {
group.close()
.get(5, TimeUnit.SECONDS);
}

Metrics.removeRegistry(registry);
registry.clear();
registry.close();
Expand Down Expand Up @@ -587,16 +621,14 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc
void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server
CountDownLatch serverClosed = new CountDownLatch(1); // socket closed on the server side
ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE;
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
HttpServer server = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.metrics(true, Function.identity())
.doOnConnection(cnx -> responseSentHandler.register(responseSent, cnx.channel().pipeline()));
.doOnConnection(cnx -> {
ResponseSentHandler.INSTANCE.register(responseSent, cnx.channel().pipeline());
ServerCloseHandler.INSTANCE.register(cnx.channel());
});

server = isHttp11 ?
server.doOnChannelInit((cnxObs, ch, sockAddr) -> serverCloseHandler.register(ch, serverClosed, isHttp11)) : server;
disposableServer = server.bindNow();

AtomicReference<SocketAddress> clientAddress = new AtomicReference<>();
Expand Down Expand Up @@ -624,22 +656,22 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco

// now check the server counters
if (isHttp11) {
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();
// make sure the client socket is closed on the server side before checking server metrics
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(SERVER_CONNECTIONS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
}
else {
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(SERVER_STREAMS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}

// These metrics are meant only for the servers,
// connections metrics for the clients are available from the connection pool
address = formatSocketAddress(clientAddress.get());
checkGauge(CLIENT_CONNECTIONS_TOTAL, false, 0, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(CLIENT_CONNECTIONS_ACTIVE, false, 0, URI, HTTP, LOCAL_ADDRESS, address);

disposableServer.disposeNow();
}

@ParameterizedTest
Expand All @@ -650,11 +682,9 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[
// ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state
ServerRecorder.INSTANCE.reset();
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
CountDownLatch serverClosed = new CountDownLatch(1);
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.doOnConnection(c -> serverCloseHandler.register(c.channel(), serverClosed, isHttp11))
.doOnConnection(cnx -> ServerCloseHandler.INSTANCE.register(cnx.channel()))
.metrics(true, ServerRecorder.supplier(), Function.identity())
.bindNow();
String address = formatSocketAddress(disposableServer.address());
Expand All @@ -674,26 +704,22 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[
.expectComplete()
.verify(Duration.ofSeconds(30));

// dispose the client connection provider now, before asserting test expectations.
provider.disposeLater()
.block(Duration.ofSeconds(30));

// now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();

// now we can assert test expectations
assertThat(ServerRecorder.INSTANCE.error.get()).isNull();

if (isHttp11) {
// wait for the AbstractHttpServerMetricsHandlerServer to be called in recordServerConnectionClosed before asserting test expectations
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
}
else {
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}

disposableServer.disposeNow();
}

@Test
Expand Down Expand Up @@ -725,16 +751,12 @@ void testIssue896() throws Exception {
@MethodSource("http11CompatibleProtocols")
void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch serverClosed = new CountDownLatch(1);
CountDownLatch clientCompleted = new CountDownLatch(1);
boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11;
ServerCloseHandler serverCloseHandler = ServerCloseHandler.INSTANCE;

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.doOnChannelInit((obs, c, s) -> serverCloseHandler.register(c, serverClosed, isHttp11))
.doOnChannelInit((cobs, ch, addr) -> ServerCloseHandler.INSTANCE.register(ch))
.httpRequestDecoder(spec -> spec.maxHeaderSize(32))
.bindNow();

CountDownLatch clientCompleted = new CountDownLatch(1);
AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols)
.doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
Expand All @@ -748,12 +770,8 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco
.expectComplete()
.verify(Duration.ofSeconds(30));

// dispose the client connection provider now, before asserting test expectations.
provider.disposeLater()
.block(Duration.ofSeconds(30));

// now the socket is closed, wait for the ServerRecorder to be called in recordServerConnectionClosed before asserting test expectations
assertThat(serverClosed.await(30, TimeUnit.SECONDS)).as("serverClosed latch await").isTrue();
// Ensure client socket is closed on the server, to make sure that server metrics are up-to-date.
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

// Ensure client has fully received the response before asserting test expectations
assertThat(clientCompleted.await(30, TimeUnit.SECONDS)).as("clientCompleted latch await").isTrue();
Expand Down Expand Up @@ -1125,7 +1143,6 @@ static final class ServerRecorder implements HttpServerMetricsRecorder {
private final AtomicReference<String> onActiveConnectionsLocalAddr = new AtomicReference<>();
private final AtomicReference<String> onInactiveConnectionsLocalAddr = new AtomicReference<>();
private final AtomicInteger onActiveConnectionsAmount = new AtomicInteger();
private volatile CountDownLatch done = new CountDownLatch(4);

static Supplier<ServerRecorder> supplier() {
return SUPPLIER;
Expand All @@ -1138,35 +1155,30 @@ void reset() {
onActiveConnectionsLocalAddr.set(null);
onInactiveConnectionsLocalAddr.set(null);
onActiveConnectionsAmount.set(0);
done = new CountDownLatch(4);
}

@Override
public void recordServerConnectionOpened(SocketAddress localAddress) {
onServerConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onServerConnectionsAmount.addAndGet(1);
done.countDown();
}

@Override
public void recordServerConnectionClosed(SocketAddress localAddress) {
onServerConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onServerConnectionsAmount.addAndGet(-1);
done.countDown();
}

@Override
public void recordServerConnectionActive(SocketAddress localAddress) {
onActiveConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onActiveConnectionsAmount.addAndGet(1);
done.countDown();
}

@Override
public void recordServerConnectionInactive(SocketAddress localAddress) {
onInactiveConnectionsLocalAddr.set(formatSocketAddress(localAddress));
onActiveConnectionsAmount.addAndGet(-1);
done.countDown();
}

@Override
Expand Down Expand Up @@ -1291,16 +1303,19 @@ static final class ServerCloseHandler extends ChannelInboundHandlerAdapter {
static final ServerCloseHandler INSTANCE = new ServerCloseHandler();
static final String HANDLER_NAME = "ServerCloseHandler.handler";
private CountDownLatch latch;

void register(Channel channel, CountDownLatch latch, boolean http11) {
this.latch = latch;

if (http11) {
channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, HANDLER_NAME, this);
private boolean registered;

void register(Channel channel) {
this.latch = new CountDownLatch(1);
if (channel instanceof Http2StreamChannel) {
if (channel.parent().pipeline().get(HANDLER_NAME) == null) {
channel.parent().pipeline().addLast(HANDLER_NAME, this);
}
}
else {
channel.parent().pipeline().addLast(HANDLER_NAME, this);
channel.pipeline().addBefore(NettyPipeline.ReactiveBridge, HANDLER_NAME, this);
}
registered = true;
}

@Override
Expand All @@ -1313,6 +1328,18 @@ public void channelInactive(ChannelHandlerContext ctx) {
public boolean isSharable() {
return true;
}

public boolean awaitClientClosedOnServer() throws InterruptedException {
if (registered) {
try {
return latch.await(30, TimeUnit.SECONDS);
}
finally {
registered = false;
}
}
return true;
}
}

/**
Expand Down

0 comments on commit fe86e6b

Please sign in to comment.