Skip to content

Commit

Permalink
Merge #3235 into 1.1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed May 8, 2024
2 parents bafc217 + 066a645 commit 9328079
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
Expand Up @@ -201,36 +201,14 @@ public final Mono<Void> disposeLater() {
if (pool instanceof GracefulShutdownInstrumentedPool) {
return ((GracefulShutdownInstrumentedPool<T>) pool)
.disposeGracefully(disposeTimeout)
.then(deRegisterDefaultMetrics(id, pool.config().metricsRecorder(), poolFactory.registrar, remoteAddress))
.onErrorResume(t -> {
log.error("Connection pool for [{}] didn't shut down gracefully", e.getKey(), t);
return Mono.fromRunnable(() -> {
if (poolFactory.registrar != null) {
poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
PoolMetricsRecorder recorder = pool.config().metricsRecorder();
if (recorder instanceof Disposable) {
((Disposable) recorder).dispose();
}
}
});
return deRegisterDefaultMetrics(id, pool.config().metricsRecorder(), poolFactory.registrar, remoteAddress);
});
}
return pool.disposeLater().then(
Mono.<Void>fromRunnable(() -> {
if (poolFactory.registrar != null) {
poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
PoolMetricsRecorder recorder = pool.config().metricsRecorder();
if (recorder instanceof Disposable) {
((Disposable) recorder).dispose();
}
}
})
);
return pool.disposeLater()
.then(deRegisterDefaultMetrics(id, pool.config().metricsRecorder(), poolFactory.registrar, remoteAddress));
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
Expand Down Expand Up @@ -352,6 +330,20 @@ protected void deRegisterDefaultMetrics(String id, SocketAddress remoteAddress)
MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE.deRegisterMetrics(name, id, remoteAddress);
}

Mono<Void> deRegisterDefaultMetrics(String id, PoolMetricsRecorder recorder, @Nullable Supplier<? extends MeterRegistrar> registrar, SocketAddress remoteAddress) {
return Mono.fromRunnable(() -> {
if (registrar != null) {
registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
if (recorder instanceof Disposable) {
((Disposable) recorder).dispose();
}
}
});
}

final boolean compareAddresses(SocketAddress origin, SocketAddress target) {
if (origin.equals(target)) {
return true;
Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.BaseHttpTest;
Expand Down Expand Up @@ -230,8 +232,9 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole
assertGauge(registry, CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, NAME, poolName).hasValueEqualTo(expectedMaxPendingAcquire);
}

@Test
void testConnectionPoolPendingAcquireSize() throws Exception {
@ParameterizedTest
@ValueSource(longs = {0, 50, 500})
void testConnectionPoolPendingAcquireSize(long disposeTimeoutMillis) throws Exception {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx =
Http2SslContextSpec.forClient()
Expand All @@ -248,13 +251,21 @@ void testConnectionPoolPendingAcquireSize() throws Exception {
.delayElements(Duration.ofMillis(4))))
.bindNow();

ConnectionProvider provider = ConnectionProvider
ConnectionProvider.Builder builder = ConnectionProvider
.builder("testConnectionPoolPendingAcquireSize")
.pendingAcquireMaxCount(1000)
.maxConnections(500)
.metrics(true)
.build();

.metrics(true);
ConnectionProvider provider = disposeTimeoutMillis == 0 ? builder.build() :
builder.disposeTimeout(Duration.ofMillis(disposeTimeoutMillis)).build();

CountDownLatch meterRemoved = new CountDownLatch(1);
String name = CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS;
registry.config().onMeterRemoved(meter -> {
if (name.equals(meter.getId().getName())) {
meterRemoved.countDown();
}
});
try {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
Expand Down Expand Up @@ -294,14 +305,17 @@ void testConnectionPoolPendingAcquireSize() throws Exception {

assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

assertGauge(registry, CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, NAME, "http2.testConnectionPoolPendingAcquireSize").hasValueEqualTo(0);
assertGauge(registry, name, NAME, "http2.testConnectionPoolPendingAcquireSize").hasValueEqualTo(0);
}
finally {
provider.disposeLater()
.block(Duration.ofSeconds(30));
}

assertThat(meterRemoved.await(30, TimeUnit.SECONDS)).isTrue();

// deRegistered
assertGauge(registry, CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, NAME, "http2.testConnectionPoolPendingAcquireSize").isNull();
assertGauge(registry, name, NAME, "http2.testConnectionPoolPendingAcquireSize").isNull();
}

@Test
Expand Down

0 comments on commit 9328079

Please sign in to comment.