Skip to content

Commit

Permalink
Merge #2257 #2175 #2274 #2294 #2295 into netty5
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jun 14, 2022
2 parents 7cfe7e2 + 21b300a commit e7a1551
Show file tree
Hide file tree
Showing 21 changed files with 1,937 additions and 355 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Expand Up @@ -28,7 +28,7 @@ buildscript {
}

plugins {
id "com.diffplug.spotless" version "6.7.0"
id "com.diffplug.spotless" version "6.7.2"
id 'org.asciidoctor.jvm.convert' version '3.3.2' apply false
id 'org.asciidoctor.jvm.pdf' version '3.3.2' apply false
id 'com.google.osdetector' version '1.7.0'
Expand Down Expand Up @@ -116,8 +116,8 @@ ext {
assertJVersion = '3.23.1'
awaitilityVersion = '4.2.0'
hoverflyJavaVersion = '0.14.2'
tomcatVersion = '9.0.63'
boringSslVersion = '2.0.52.Final'
tomcatVersion = '9.0.64'
boringSslVersion = '2.0.53.Final'
junitVersion = '5.8.2'
junitPlatformLauncherVersion = '1.8.2'
mockitoVersion = '4.6.1'
Expand Down
2 changes: 1 addition & 1 deletion codequality/checkstyle.xml
Expand Up @@ -76,7 +76,7 @@
<property name="max" value="4"/>
</module>
<module name="NestedIfDepth">
<property name="max" value="4"/>
<property name="max" value="5"/>
</module>
<module name="NestedTryDepth">
<property name="max" value="4"/>
Expand Down
Expand Up @@ -93,6 +93,7 @@ public interface NettyPipeline {
String CompressionHandler = LEFT + "compressionHandler";
String ConnectMetricsHandler = LEFT + "connectMetricsHandler";
String H2CUpgradeHandler = LEFT + "h2cUpgradeHandler";
String H2Flush = LEFT + "h2Flush";
String H2MultiplexHandler = LEFT + "h2MultiplexHandler";
String H2OrHttp11Codec = LEFT + "h2OrHttp11Codec";
String H2ToHttp11Codec = LEFT + "h2ToHttp11Codec";
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.netty5.resolver.AddressResolverGroup;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.ReactorNetty;
Expand All @@ -32,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -253,6 +255,68 @@ default String name() {
return null;
}

interface AllocationStrategy<A extends AllocationStrategy<A>> {

/**
* Returns a deep copy of this instance.
*
* @return a deep copy of this instance
*/
A copy();

/**
* Best-effort peek at the state of the strategy which indicates roughly how many more connections can currently be
* allocated. Should be paired with {@link #getPermits(int)} for an atomic permission.
*
* @return an ESTIMATED count of how many more connections can currently be allocated
*/
int estimatePermitCount();

/**
* Try to get the permission to allocate a {@code desired} positive number of new connections. Returns the permissible
* number of connections which MUST be created (otherwise the internal live counter of the strategy might be off).
* This permissible number might be zero, and it can also be a greater number than {@code desired}.
* Once a connection is discarded from the pool, it must update the strategy using {@link #returnPermits(int)}
* (which can happen in batches or with value {@literal 1}).
*
* @param desired the desired number of new connections
* @return the actual number of new connections that MUST be created, can be 0 and can be more than {@code desired}
*/
int getPermits(int desired);

/**
* Returns the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*
* @return the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
*/
int permitGranted();

/**
* Return the minimum number of permits this strategy tries to maintain granted
* (reflecting a minimal size for the pool), or {@code 0} for scale-to-zero.
*
* @return the minimum number of permits this strategy tries to maintain, or {@code 0}
*/
int permitMinimum();

/**
* Returns the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*
* @return the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
*/
int permitMaximum();

/**
* Update the strategy to indicate that N connections were discarded, potentially leaving space
* for N new ones to be allocated. Users MUST ensure that this method isn't called with a value greater than the
* number of held permits it has.
* <p>
* Some strategy MIGHT throw an {@link IllegalArgumentException} if it can be determined the number of returned permits
* is not consistent with the strategy's limits and delivered permits.
*/
void returnPermits(int returned);
}

/**
* Build a {@link ConnectionProvider} to cache and reuse a fixed maximum number of
* {@link Connection}. Further connections will be pending acquisition depending on
Expand Down Expand Up @@ -387,6 +451,8 @@ class ConnectionPoolSpec<SPEC extends ConnectionPoolSpec<SPEC>> implements Suppl
boolean metricsEnabled;
String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY;
Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
AllocationStrategy<?> allocationStrategy;

/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
Expand All @@ -410,6 +476,8 @@ private ConnectionPoolSpec() {
this.metricsEnabled = copy.metricsEnabled;
this.leasingStrategy = copy.leasingStrategy;
this.registrar = copy.registrar;
this.pendingAcquireTimer = copy.pendingAcquireTimer;
this.allocationStrategy = copy.allocationStrategy;
}

/**
Expand All @@ -428,17 +496,21 @@ public final SPEC pendingAcquireTimeout(Duration pendingAcquireTimeout) {

/**
* Set the options to use for configuring {@link ConnectionProvider} maximum connections per connection pool.
* This is a pre-made allocation strategy where only max connections is specified.
* Custom allocation strategies can be provided via {@link #allocationStrategy(AllocationStrategy)}.
* Default to {@link #DEFAULT_POOL_MAX_CONNECTIONS}.
*
* @param maxConnections the maximum number of connections (per connection pool) before start pending
* @return {@literal this}
* @see #allocationStrategy(AllocationStrategy)
* @throws IllegalArgumentException if maxConnections is negative
*/
public final SPEC maxConnections(int maxConnections) {
if (maxConnections <= 0) {
throw new IllegalArgumentException("Max Connections value must be strictly positive");
}
this.maxConnections = maxConnections;
this.allocationStrategy = null;
return get();
}

Expand Down Expand Up @@ -580,6 +652,59 @@ public final SPEC evictInBackground(Duration evictionInterval) {
return get();
}

/**
* Set the option to use for configuring {@link ConnectionProvider} pending acquire timer.
* The pending acquire timer must be specified as a function which is used to schedule a pending acquire timeout
* when there is no idle connection and no new connection can be created currently.
* The function takes as argument a {@link Duration} which is the one configured by {@link #pendingAcquireTimeout(Duration)}.
* <p>
* Use this function if you want to specify your own implementation for scheduling pending acquire timers.
*
* <p> Default to {@link Schedulers#parallel()}.
*
* <p>Examples using Netty HashedWheelTimer implementation:</p>
* <pre>
* {@code
* final static HashedWheelTimer wheel = new HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024);
*
* HttpClient client = HttpClient.create(
* ConnectionProvider.builder("myprovider")
* .pendingAcquireTimeout(Duration.ofMillis(10000))
* .pendingAcquireTimer((r, d) -> {
* Timeout t = wheel.newTimeout(timeout -> r.run(), d.toMillis(), TimeUnit.MILLISECONDS);
* return () -> t.cancel();
* })
* .build());
* }
* </pre>
*
* @param pendingAcquireTimer the function to apply when scheduling pending acquire timers
* @return {@literal this}
* @throws NullPointerException if pendingAcquireTimer is null
* @since 1.0.20
* @see #pendingAcquireTimeout(Duration)
*/
public final SPEC pendingAcquireTimer(BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer) {
this.pendingAcquireTimer = Objects.requireNonNull(pendingAcquireTimer, "pendingAcquireTimer");
return get();
}

/**
* Limits in how many connections can be allocated and managed by the pool are driven by the
* provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last
* configured strategy, but most cases should be covered by the {@link #maxConnections()}
* pre-made allocation strategy.
*
* @param allocationStrategy the {@link AllocationStrategy} to use
* @return {@literal this}
* @see #maxConnections()
* @since 1.0.20
*/
public final SPEC allocationStrategy(AllocationStrategy<?> allocationStrategy) {
this.allocationStrategy = Objects.requireNonNull(allocationStrategy, "allocationStrategy");
return get();
}

@Override
@SuppressWarnings("unchecked")
public SPEC get() {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import reactor.netty.internal.util.Metrics;
import reactor.netty.transport.TransportConfig;
import reactor.netty.internal.util.MapUtils;
import reactor.pool.AllocationStrategy;
import reactor.pool.InstrumentedPool;
import reactor.pool.Pool;
import reactor.pool.PoolBuilder;
Expand All @@ -59,6 +58,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -91,6 +91,7 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration poolInactivity;
final Duration disposeTimeout;
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

protected PooledConnectionProvider(Builder builder) {
this(builder, null);
Expand All @@ -108,6 +109,7 @@ protected PooledConnectionProvider(Builder builder) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
}
this.onDispose = Mono.empty();
scheduleInactivePoolsDisposal();
}

Expand Down Expand Up @@ -197,10 +199,10 @@ public final Mono<Void> disposeLater() {
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
return Mono.empty();
return onDispose;
}
channelPools.clear();
return Mono.when(pools);
return onDispose.and(Mono.when(pools));
});
}

Expand Down Expand Up @@ -250,6 +252,10 @@ public String name() {
return name;
}

public void onDispose(Mono<Void> disposeMono) {
onDispose = onDispose.and(disposeMono);
}

protected abstract CoreSubscriber<PooledRef<T>> createDisposableAcquire(
TransportConfig config,
ConnectionObserver connectionObserver,
Expand Down Expand Up @@ -372,6 +378,8 @@ protected static final class PoolFactory<T extends Connection> {
final Supplier<? extends MeterRegistrar> registrar;
final Clock clock;
final Duration disposeTimeout;
final BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
final AllocationStrategy<?> allocationStrategy;

PoolFactory(ConnectionPoolSpec<?> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
Expand All @@ -391,11 +399,13 @@ protected static final class PoolFactory<T extends Connection> {
this.registrar = conf.registrar;
this.clock = clock;
this.disposeTimeout = disposeTimeout;
this.pendingAcquireTimer = conf.pendingAcquireTimer;
this.allocationStrategy = conf.allocationStrategy;
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable AllocationStrategy allocationStrategy,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate) {
if (disposeTimeout != null) {
Expand All @@ -407,7 +417,7 @@ public InstrumentedPool<T> newPool(

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
@Nullable AllocationStrategy allocationStrategy,
@Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> evictionPredicate,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
Expand Down Expand Up @@ -440,7 +450,16 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE));
}
else {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
if (allocationStrategy == null) {
poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
}
else {
poolBuilder = poolBuilder.allocationStrategy(new DelegatingAllocationStrategy(allocationStrategy.copy()));
}
}

if (pendingAcquireTimer != null) {
poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer);
}

if (clock != null) {
Expand All @@ -457,6 +476,15 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
return poolBuilder;
}

@Nullable
public AllocationStrategy<?> allocationStrategy() {
return allocationStrategy;
}

public long maxIdleTime() {
return this.maxIdleTime;
}

public long maxLifeTime() {
return maxLifeTime;
}
Expand All @@ -474,6 +502,45 @@ public String toString() {
", pendingAcquireTimeout=" + pendingAcquireTimeout +
'}';
}

static final class DelegatingAllocationStrategy implements reactor.pool.AllocationStrategy {

final AllocationStrategy<?> delegate;

DelegatingAllocationStrategy(AllocationStrategy<?> delegate) {
this.delegate = delegate;
}

@Override
public int estimatePermitCount() {
return delegate.estimatePermitCount();
}

@Override
public int getPermits(int desired) {
return delegate.getPermits(desired);
}

@Override
public int permitGranted() {
return delegate.permitGranted();
}

@Override
public int permitMinimum() {
return delegate.permitMinimum();
}

@Override
public int permitMaximum() {
return delegate.permitMaximum();
}

@Override
public void returnPermits(int returned) {
delegate.returnPermits(returned);
}
}
}

static final class PoolKey {
Expand Down

0 comments on commit e7a1551

Please sign in to comment.