Skip to content

Commit

Permalink
Introduce HTTP2AllocationStrategy for specifying minConnections and m…
Browse files Browse the repository at this point in the history
…axConcurrentStreams
  • Loading branch information
violetagg committed Jun 3, 2022
1 parent 0ab96fd commit bf36afc
Show file tree
Hide file tree
Showing 7 changed files with 552 additions and 70 deletions.
Expand Up @@ -253,6 +253,64 @@ default String name() {
return null;
}

interface AllocationStrategy {

/**
* Best-effort peek at the state of the strategy which indicates roughly how many more resources can currently be
* allocated. Should be paired with {@link #getPermits(int)} for an atomic permission.
*
* @return an ESTIMATED count of how many more resources can currently be allocated
*/
int estimatePermitCount();

/**
* Try to get the permission to allocate a {@code desired} positive number of new resources. Returns the permissible
* number of resources which MUST be created (otherwise the internal live counter of the strategy might be off).
* This permissible number might be zero, and it can also be a greater number than {@code desired}, which could for
* example denote a minimum warmed-up size for the pool to maintain (see below).
* Once a resource is discarded from the pool, it must update the strategy using {@link #returnPermits(int)}
* (which can happen in batches or with value {@literal 1}).
* <p>
* For the warming up case, the typical pattern would be to call this method with a {@code desired} of zero.
*
* @param desired the desired number of new resources
* @return the actual number of new resources that MUST be created, can be 0 and can be more than {@code desired}
*/
int getPermits(int desired);

/**
* Returns the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*
* @return the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*/
int permitGranted();

/**
* Return the minimum number of permits this strategy tries to maintain granted
* (reflecting a minimal size for the pool), or {@code 0} for scale-to-zero.
*
* @return the minimum number of permits this strategy tries to maintain, or {@code 0}
*/
int permitMinimum();

/**
* Returns the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*
* @return the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*/
int permitMaximum();

/**
* Update the strategy to indicate that N resources were discarded, potentially leaving space
* for N new ones to be allocated. Users MUST ensure that this method isn't called with a value greater than the
* number of held permits it has.
* <p>
* Some strategy MIGHT throw an {@link IllegalArgumentException} if it can be determined the number of returned permits
* is not consistent with the strategy's limits and delivered permits.
*/
void returnPermits(int returned);
}

/**
* Build a {@link ConnectionProvider} to cache and reuse a fixed maximum number of
* {@link Connection}. Further connections will be pending acquisition depending on
Expand Down Expand Up @@ -387,6 +445,7 @@ class ConnectionPoolSpec<SPEC extends ConnectionPoolSpec<SPEC>> implements Suppl
boolean metricsEnabled;
String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY;
Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
AllocationStrategy allocationStrategy;

/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
Expand All @@ -410,6 +469,7 @@ private ConnectionPoolSpec() {
this.metricsEnabled = copy.metricsEnabled;
this.leasingStrategy = copy.leasingStrategy;
this.registrar = copy.registrar;
this.allocationStrategy = copy.allocationStrategy;
}

/**
Expand Down Expand Up @@ -439,6 +499,7 @@ public final SPEC maxConnections(int maxConnections) {
throw new IllegalArgumentException("Max Connections value must be strictly positive");
}
this.maxConnections = maxConnections;
this.allocationStrategy = null;
return get();
}

Expand Down Expand Up @@ -580,6 +641,21 @@ public final SPEC evictInBackground(Duration evictionInterval) {
return get();
}

/**
* Limits in how many resources can be allocated and managed by the pool are driven by the
* provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last
* configured strategy, but most cases should be covered by the {@link #maxConnections()} pre-made strategies.
*
* @param allocationStrategy the {@link AllocationStrategy} to use
* @return {@literal this}
* @see #maxConnections()
* @since 1.0.20
*/
public final SPEC allocationStrategy(AllocationStrategy allocationStrategy) {
this.allocationStrategy = Objects.requireNonNull(allocationStrategy, "allocationStrategy");
return get();
}

@Override
@SuppressWarnings("unchecked")
public SPEC get() {
Expand Down
Expand Up @@ -30,7 +30,6 @@
import reactor.netty.ReactorNetty;
import reactor.netty.transport.TransportConfig;
import reactor.netty.internal.util.MapUtils;
import reactor.pool.AllocationStrategy;
import reactor.pool.InstrumentedPool;
import reactor.pool.Pool;
import reactor.pool.PoolBuilder;
Expand Down Expand Up @@ -370,6 +369,7 @@ protected static final class PoolFactory<T extends Connection> {
final Supplier<? extends MeterRegistrar> registrar;
final Clock clock;
final Duration disposeTimeout;
final AllocationStrategy allocationStrategy;

PoolFactory(ConnectionPoolSpec<?> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
Expand All @@ -389,6 +389,7 @@ protected static final class PoolFactory<T extends Connection> {
this.registrar = conf.registrar;
this.clock = clock;
this.disposeTimeout = disposeTimeout;
this.allocationStrategy = conf.allocationStrategy;
}

public InstrumentedPool<T> newPool(
Expand Down Expand Up @@ -438,7 +439,12 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE));
}
else {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
if (allocationStrategy == null) {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
}
else {
poolBuilder = poolBuilder.allocationStrategy(new DelegatingAllocationStrategy(allocationStrategy));
}
}

if (clock != null) {
Expand All @@ -455,6 +461,11 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
return poolBuilder;
}

@Nullable
public AllocationStrategy allocationStrategy() {
return allocationStrategy;
}

public long maxIdleTime() {
return this.maxIdleTime;
}
Expand All @@ -476,6 +487,45 @@ public String toString() {
", pendingAcquireTimeout=" + pendingAcquireTimeout +
'}';
}

static final class DelegatingAllocationStrategy implements reactor.pool.AllocationStrategy {

final AllocationStrategy delegate;

DelegatingAllocationStrategy(AllocationStrategy delegate) {
this.delegate = delegate;
}

@Override
public int estimatePermitCount() {
return delegate.estimatePermitCount();
}

@Override
public int getPermits(int desired) {
return delegate.getPermits(desired);
}

@Override
public int permitGranted() {
return delegate.permitGranted();
}

@Override
public int permitMinimum() {
return delegate.permitMinimum();
}

@Override
public int permitMaximum() {
return delegate.permitMaximum();
}

@Override
public void returnPermits(int returned) {
delegate.returnPermits(returned);
}
}
}

static final class PoolKey {
Expand Down

0 comments on commit bf36afc

Please sign in to comment.