Skip to content

Commit

Permalink
api,core: Adds an Executor field to NameResolver.Args.
Browse files Browse the repository at this point in the history
Adds an Executor to NameResolver.Args, which is optionally set on ManagedChannelBuilder. This allows NameResolver implementations to avoid creating their own thread pools if the application already manages its own pools.

Addresses #3703.
  • Loading branch information
groakley authored and ejona86 committed Oct 14, 2019
1 parent 5888631 commit adcfb3e
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 37 deletions.
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/ForwardingChannelBuilder.java
Expand Up @@ -71,6 +71,12 @@ public T executor(Executor executor) {
return thisT();
}

@Override
public T blockingExecutor(Executor executor) {
delegate().blockingExecutor(executor);
return thisT();
}

@Override
public T intercept(List<ClientInterceptor> interceptors) {
delegate().intercept(interceptors);
Expand Down
17 changes: 17 additions & 0 deletions api/src/main/java/io/grpc/ManagedChannelBuilder.java
Expand Up @@ -103,6 +103,23 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
*/
public abstract T executor(Executor executor);

/**
* Provides a custom executor that will be used for operations that block.
*
* <p>It's an optional parameter. If the user has not provided an executor when the channel is
* built, the builder will use a static cached thread pool.
*
* <p>The channel won't take ownership of the given executor. It's caller's responsibility to shut
* down the executor when it's desired.
*
* @return this
* @throws UnsupportedOperationException if unsupported
* @since 1.25.0
*/
public T blockingExecutor(Executor executor) {
throw new UnsupportedOperationException();
}

/**
* Adds interceptors that will be called before the channel performs its real work. This is
* functionally equivalent to using {@link ClientInterceptors#intercept(Channel, List)}, but while
Expand Down
39 changes: 35 additions & 4 deletions api/src/main/java/io/grpc/NameResolver.java
Expand Up @@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -411,13 +412,19 @@ public static final class Args {
private final ProxyDetector proxyDetector;
private final SynchronizationContext syncContext;
private final ServiceConfigParser serviceConfigParser;

Args(Integer defaultPort, ProxyDetector proxyDetector,
SynchronizationContext syncContext, ServiceConfigParser serviceConfigParser) {
@Nullable private final Executor executor;

Args(
Integer defaultPort,
ProxyDetector proxyDetector,
SynchronizationContext syncContext,
ServiceConfigParser serviceConfigParser,
@Nullable Executor executor) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set");
this.executor = executor;
}

/**
Expand Down Expand Up @@ -459,13 +466,25 @@ public ServiceConfigParser getServiceConfigParser() {
return serviceConfigParser;
}

/**
* Returns the Executor on which this resolver should execute long-running or I/O bound work.
* Null if no Executor was set.
*
* @since 1.25.0
*/
@Nullable
public Executor getBlockingExecutor() {
return executor;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("defaultPort", defaultPort)
.add("proxyDetector", proxyDetector)
.add("syncContext", syncContext)
.add("serviceConfigParser", serviceConfigParser)
.add("executor", executor)
.toString();
}

Expand All @@ -480,6 +499,7 @@ public Builder toBuilder() {
builder.setProxyDetector(proxyDetector);
builder.setSynchronizationContext(syncContext);
builder.setServiceConfigParser(serviceConfigParser);
builder.setBlockingExecutor(executor);
return builder;
}

Expand All @@ -502,6 +522,7 @@ public static final class Builder {
private ProxyDetector proxyDetector;
private SynchronizationContext syncContext;
private ServiceConfigParser serviceConfigParser;
private Executor executor;

Builder() {
}
Expand Down Expand Up @@ -546,13 +567,23 @@ public Builder setServiceConfigParser(ServiceConfigParser parser) {
return this;
}

/**
* See {@link Args#getBlockingExecutor}. This is an optional field.
*
* @since 1.25.0
*/
public Builder setBlockingExecutor(Executor executor) {
this.executor = executor;
return this;
}

/**
* Builds an {@link Args}.
*
* @since 1.21.0
*/
public Args build() {
return new Args(defaultPort, proxyDetector, syncContext, serviceConfigParser);
return new Args(defaultPort, proxyDetector, syncContext, serviceConfigParser, executor);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/io/grpc/NameResolverTest.java
Expand Up @@ -29,6 +29,8 @@
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -43,6 +45,7 @@ public class NameResolverTest {
private final SynchronizationContext syncContext =
new SynchronizationContext(mock(UncaughtExceptionHandler.class));
private final ServiceConfigParser parser = mock(ServiceConfigParser.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private URI uri;
private final NameResolver nameResolver = mock(NameResolver.class);

Expand All @@ -58,12 +61,14 @@ public void args() {
assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args.getBlockingExecutor()).isSameInstanceAs(executor);

NameResolver.Args args2 = args.toBuilder().build();
assertThat(args2.getDefaultPort()).isEqualTo(defaultPort);
assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args2.getBlockingExecutor()).isSameInstanceAs(executor);

assertThat(args2).isNotSameInstanceAs(args);
assertThat(args2).isNotEqualTo(args);
Expand Down Expand Up @@ -246,6 +251,7 @@ private NameResolver.Args createArgs() {
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(parser)
.setBlockingExecutor(executor)
.build();
}
}
Expand Up @@ -95,6 +95,8 @@ public static ManagedChannelBuilder<?> forTarget(String target) {

ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;

ObjectPool<? extends Executor> blockingExecutorPool = DEFAULT_EXECUTOR_POOL;

private final List<ClientInterceptor> interceptors = new ArrayList<>();
final NameResolverRegistry nameResolverRegistry = NameResolverRegistry.getDefaultRegistry();

Expand Down Expand Up @@ -217,6 +219,16 @@ public final T executor(Executor executor) {
return thisT();
}

@Override
public final T blockingExecutor(Executor executor) {
if (executor != null) {
this.blockingExecutorPool = new FixedObjectPool<>(executor);
} else {
this.blockingExecutorPool = DEFAULT_EXECUTOR_POOL;
}
return thisT();
}

@Override
public final T intercept(List<ClientInterceptor> interceptors) {
this.interceptors.addAll(interceptors);
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/io/grpc/internal/DnsNameResolver.java
Expand Up @@ -138,6 +138,8 @@ final class DnsNameResolver extends NameResolver {
private final String authority;
private final String host;
private final int port;

/** Executor that will be used if an Executor is not provide via {@link NameResolver.Args}. */
private final Resource<Executor> executorResource;
private final long cacheTtlNanos;
private final SynchronizationContext syncContext;
Expand All @@ -147,6 +149,10 @@ final class DnsNameResolver extends NameResolver {
private ResolutionResults cachedResolutionResults;
private boolean shutdown;
private Executor executor;

/** True if using an executor resource that should be released after use. */
private final boolean usingExecutorResource;

private boolean resolving;

// The field must be accessed from syncContext, although the methods on an Listener2 can be called
Expand Down Expand Up @@ -176,6 +182,8 @@ final class DnsNameResolver extends NameResolver {
this.stopwatch = Preconditions.checkNotNull(stopwatch, "stopwatch");
this.syncContext =
Preconditions.checkNotNull(args.getSynchronizationContext(), "syncContext");
this.executor = args.getBlockingExecutor();
this.usingExecutorResource = executor == null;
}

@Override
Expand All @@ -186,7 +194,9 @@ public String getServiceAuthority() {
@Override
public void start(Listener2 listener) {
Preconditions.checkState(this.listener == null, "already started");
executor = SharedResourceHolder.get(executorResource);
if (usingExecutorResource) {
executor = SharedResourceHolder.get(executorResource);
}
this.listener = Preconditions.checkNotNull(listener, "listener");
resolve();
}
Expand Down Expand Up @@ -361,7 +371,7 @@ public void shutdown() {
return;
}
shutdown = true;
if (executor != null) {
if (executor != null && usingExecutorResource) {
executor = SharedResourceHolder.release(executorResource, executor);
}
}
Expand Down
34 changes: 25 additions & 9 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -142,6 +142,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
private final ExecutorHolder balancerRpcExecutorHolder;
private final ExecutorHolder blockingExecutorHolder;
private final TimeProvider timeProvider;
private final int maxTraceEvents;

Expand Down Expand Up @@ -565,16 +566,30 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
this.blockingExecutorHolder =
new ExecutorHolder(
checkNotNull(builder.blockingExecutorPool, "blockingExecutorPool"));
this.nameResolverRegistry = builder.nameResolverRegistry;
this.nameResolverArgs = NameResolver.Args.newBuilder()
.setDefaultPort(builder.getDefaultPort())
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(
new ScParser(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts,
loadBalancerFactory))
.build();
this.nameResolverArgs =
NameResolver.Args.newBuilder()
.setDefaultPort(builder.getDefaultPort())
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(
new ScParser(
retryEnabled,
builder.maxRetryAttempts,
builder.maxHedgedAttempts,
loadBalancerFactory))
.setBlockingExecutor(
// Avoid creating the blockingExecutor until it is first used
new Executor() {
@Override
public void execute(Runnable command) {
blockingExecutorHolder.getExecutor().execute(command);
}
})
.build();
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
maxTraceEvents = builder.maxTraceEvents;
Expand Down Expand Up @@ -885,6 +900,7 @@ private void maybeTerminateChannel() {
terminatedLatch.countDown();
executorPool.returnObject(executor);
balancerRpcExecutorHolder.release();
blockingExecutorHolder.release();
// Release the transport factory so that it can deallocate any resources.
transportFactory.close();
}
Expand Down
Expand Up @@ -99,6 +99,21 @@ public void directExecutor() {
assertEquals(MoreExecutors.directExecutor(), builder.executorPool.getObject());
}

@Test
public void blockingExecutor_normal() {
Executor executor = mock(Executor.class);
assertEquals(builder, builder.blockingExecutor(executor));
assertEquals(executor, builder.blockingExecutorPool.getObject());
}

@Test
public void blockingExecutor_null() {
ObjectPool<? extends Executor> defaultValue = builder.blockingExecutorPool;
builder.blockingExecutor(mock(Executor.class));
assertEquals(builder, builder.blockingExecutor(null));
assertEquals(defaultValue, builder.blockingExecutorPool);
}

@Test
public void nameResolverFactory_default() {
assertNotNull(builder.getNameResolverFactory());
Expand Down

0 comments on commit adcfb3e

Please sign in to comment.