Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http2 connection pool optimisation #2257

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion codequality/checkstyle.xml
Expand Up @@ -76,7 +76,7 @@
<property name="max" value="4"/>
</module>
<module name="NestedIfDepth">
<property name="max" value="4"/>
<property name="max" value="5"/>
</module>
<module name="NestedTryDepth">
<property name="max" value="4"/>
Expand Down
Expand Up @@ -93,6 +93,7 @@ public interface NettyPipeline {
String CompressionHandler = LEFT + "compressionHandler";
String ConnectMetricsHandler = LEFT + "connectMetricsHandler";
String H2CUpgradeHandler = LEFT + "h2cUpgradeHandler";
String H2Flush = LEFT + "h2Flush";
String H2MultiplexHandler = LEFT + "h2MultiplexHandler";
String H2OrHttp11Codec = LEFT + "h2OrHttp11Codec";
String H2ToHttp11Codec = LEFT + "h2ToHttp11Codec";
Expand Down
Expand Up @@ -253,6 +253,68 @@ default String name() {
return null;
}

interface AllocationStrategy<A extends AllocationStrategy<A>> {

/**
* Returns a deep copy of this instance.
*
* @return a deep copy of this instance
*/
A copy();

/**
* Best-effort peek at the state of the strategy which indicates roughly how many more connections can currently be
* allocated. Should be paired with {@link #getPermits(int)} for an atomic permission.
*
* @return an ESTIMATED count of how many more connections can currently be allocated
*/
int estimatePermitCount();

/**
* Try to get the permission to allocate a {@code desired} positive number of new connections. Returns the permissible
* number of connections which MUST be created (otherwise the internal live counter of the strategy might be off).
* This permissible number might be zero, and it can also be a greater number than {@code desired}.
* Once a connection is discarded from the pool, it must update the strategy using {@link #returnPermits(int)}
* (which can happen in batches or with value {@literal 1}).
*
* @param desired the desired number of new connections
* @return the actual number of new connections that MUST be created, can be 0 and can be more than {@code desired}
*/
int getPermits(int desired);

/**
* Returns the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*
* @return the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*/
int permitGranted();

/**
* Return the minimum number of permits this strategy tries to maintain granted
* (reflecting a minimal size for the pool), or {@code 0} for scale-to-zero.
*
* @return the minimum number of permits this strategy tries to maintain, or {@code 0}
*/
int permitMinimum();

/**
* Returns the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*
* @return the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*/
int permitMaximum();

/**
* Update the strategy to indicate that N connections were discarded, potentially leaving space
* for N new ones to be allocated. Users MUST ensure that this method isn't called with a value greater than the
* number of held permits it has.
* <p>
* Some strategy MIGHT throw an {@link IllegalArgumentException} if it can be determined the number of returned permits
* is not consistent with the strategy's limits and delivered permits.
*/
void returnPermits(int returned);
}

/**
* Build a {@link ConnectionProvider} to cache and reuse a fixed maximum number of
* {@link Connection}. Further connections will be pending acquisition depending on
Expand Down Expand Up @@ -387,6 +449,7 @@ class ConnectionPoolSpec<SPEC extends ConnectionPoolSpec<SPEC>> implements Suppl
boolean metricsEnabled;
String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY;
Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
AllocationStrategy<?> allocationStrategy;

/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
Expand All @@ -410,6 +473,7 @@ private ConnectionPoolSpec() {
this.metricsEnabled = copy.metricsEnabled;
this.leasingStrategy = copy.leasingStrategy;
this.registrar = copy.registrar;
this.allocationStrategy = copy.allocationStrategy;
}

/**
Expand All @@ -428,17 +492,21 @@ public final SPEC pendingAcquireTimeout(Duration pendingAcquireTimeout) {

/**
* Set the options to use for configuring {@link ConnectionProvider} maximum connections per connection pool.
* This is a pre-made allocation strategy where only max connections is specified.
* Custom allocation strategies can be provided via {@link #allocationStrategy(AllocationStrategy)}.
* Default to {@link #DEFAULT_POOL_MAX_CONNECTIONS}.
*
* @param maxConnections the maximum number of connections (per connection pool) before start pending
* @return {@literal this}
* @see #allocationStrategy(AllocationStrategy)
* @throws IllegalArgumentException if maxConnections is negative
*/
public final SPEC maxConnections(int maxConnections) {
if (maxConnections <= 0) {
throw new IllegalArgumentException("Max Connections value must be strictly positive");
}
this.maxConnections = maxConnections;
this.allocationStrategy = null;
return get();
}

Expand Down Expand Up @@ -580,6 +648,22 @@ public final SPEC evictInBackground(Duration evictionInterval) {
return get();
}

/**
* Limits in how many connections can be allocated and managed by the pool are driven by the
* provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last
* configured strategy, but most cases should be covered by the {@link #maxConnections()}
* pre-made allocation strategy.
*
* @param allocationStrategy the {@link AllocationStrategy} to use
* @return {@literal this}
* @see #maxConnections()
* @since 1.0.20
*/
public final SPEC allocationStrategy(AllocationStrategy<?> allocationStrategy) {
this.allocationStrategy = Objects.requireNonNull(allocationStrategy, "allocationStrategy");
return get();
}

@Override
@SuppressWarnings("unchecked")
public SPEC get() {
Expand Down
Expand Up @@ -30,7 +30,6 @@
import reactor.netty.ReactorNetty;
import reactor.netty.transport.TransportConfig;
import reactor.netty.internal.util.MapUtils;
import reactor.pool.AllocationStrategy;
import reactor.pool.InstrumentedPool;
import reactor.pool.Pool;
import reactor.pool.PoolBuilder;
Expand Down Expand Up @@ -89,6 +88,7 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration poolInactivity;
final Duration disposeTimeout;
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

protected PooledConnectionProvider(Builder builder) {
this(builder, null);
Expand All @@ -106,6 +106,7 @@ protected PooledConnectionProvider(Builder builder) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
}
this.onDispose = Mono.empty();
scheduleInactivePoolsDisposal();
}

Expand Down Expand Up @@ -190,10 +191,10 @@ public final Mono<Void> disposeLater() {
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
return Mono.empty();
return onDispose;
}
channelPools.clear();
return Mono.when(pools);
return onDispose.and(Mono.when(pools));
});
}

Expand Down Expand Up @@ -243,6 +244,10 @@ public String name() {
return name;
}

public void onDispose(Mono<Void> disposeMono) {
onDispose = onDispose.and(disposeMono);
}

protected abstract CoreSubscriber<PooledRef<T>> createDisposableAcquire(
TransportConfig config,
ConnectionObserver connectionObserver,
Expand Down Expand Up @@ -364,6 +369,7 @@ protected static final class PoolFactory<T extends Connection> {
final Supplier<? extends MeterRegistrar> registrar;
final Clock clock;
final Duration disposeTimeout;
final AllocationStrategy<?> allocationStrategy;

PoolFactory(ConnectionPoolSpec<?> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
Expand All @@ -383,11 +389,12 @@ protected static final class PoolFactory<T extends Connection> {
this.registrar = conf.registrar;
this.clock = clock;
this.disposeTimeout = disposeTimeout;
this.allocationStrategy = conf.allocationStrategy;
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable AllocationStrategy allocationStrategy,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate) {
if (disposeTimeout != null) {
Expand All @@ -399,7 +406,7 @@ public InstrumentedPool<T> newPool(

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable AllocationStrategy allocationStrategy,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
Expand Down Expand Up @@ -432,7 +439,12 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE));
}
else {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
if (allocationStrategy == null) {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
}
else {
poolBuilder = poolBuilder.allocationStrategy(new DelegatingAllocationStrategy(allocationStrategy.copy()));
}
}

if (clock != null) {
Expand All @@ -449,6 +461,15 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
return poolBuilder;
}

@Nullable
public AllocationStrategy<?> allocationStrategy() {
return allocationStrategy;
}

public long maxIdleTime() {
return this.maxIdleTime;
}

public long maxLifeTime() {
return maxLifeTime;
}
Expand All @@ -466,6 +487,45 @@ public String toString() {
", pendingAcquireTimeout=" + pendingAcquireTimeout +
'}';
}

static final class DelegatingAllocationStrategy implements reactor.pool.AllocationStrategy {
simonbasle marked this conversation as resolved.
Show resolved Hide resolved

final AllocationStrategy<?> delegate;

DelegatingAllocationStrategy(AllocationStrategy<?> delegate) {
this.delegate = delegate;
}

@Override
public int estimatePermitCount() {
return delegate.estimatePermitCount();
}

@Override
public int getPermits(int desired) {
return delegate.getPermits(desired);
}

@Override
public int permitGranted() {
return delegate.permitGranted();
}

@Override
public int permitMinimum() {
return delegate.permitMinimum();
}

@Override
public int permitMaximum() {
return delegate.permitMaximum();
}

@Override
public void returnPermits(int returned) {
delegate.returnPermits(returned);
}
}
}

static final class PoolKey {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@

class ConnectionProviderTest {

static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy();
static final String TEST_STRING = "";
static final Supplier<ConnectionProvider.MeterRegistrar> TEST_SUPPLIER = () -> (a, b, c, d) -> {};

Expand Down Expand Up @@ -64,6 +65,9 @@ else if (Map.class == clazz) {
else if (Supplier.class == clazz) {
field.set(builder, TEST_SUPPLIER);
}
else if (ConnectionProvider.AllocationStrategy.class == clazz) {
field.set(builder, TEST_ALLOCATION_STRATEGY);
}
else if (boolean.class == clazz) {
field.setBoolean(builder, true);
}
Expand All @@ -74,4 +78,41 @@ else if (int.class == clazz) {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
}

static final class TestAllocationStrategy implements ConnectionProvider.AllocationStrategy<TestAllocationStrategy> {

@Override
public TestAllocationStrategy copy() {
return this;
}

@Override
public int estimatePermitCount() {
return 0;
}

@Override
public int getPermits(int desired) {
return 0;
}

@Override
public int permitGranted() {
return 0;
}

@Override
public int permitMinimum() {
return 0;
}

@Override
public int permitMaximum() {
return 0;
}

@Override
public void returnPermits(int returned) {
}
}
}
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.http;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) {
http2ConnectionProvider = new AtomicReference<>();
}

@Override
public void disposeWhen(SocketAddress remoteAddress) {
ConnectionProvider provider = http2ConnectionProvider.get();
if (provider != null) {
provider.disposeWhen(remoteAddress);
}
super.disposeWhen(remoteAddress);
}

@Override
public AddressResolverGroup<?> getOrCreateDefaultResolver() {
return super.getOrCreateDefaultResolver();
Expand Down