From adcfb3e623d5d946f7b7c5218e0dfeebe99b66de Mon Sep 17 00:00:00 2001 From: Grant Oakley Date: Mon, 14 Oct 2019 08:46:26 -0700 Subject: [PATCH] api,core: Adds an Executor field to NameResolver.Args. Adds an Executor to NameResolver.Args, which is optionally set on ManagedChannelBuilder. This allows NameResolver implementations to avoid creating their own thread pools if the application already manages its own pools. Addresses #3703. --- .../io/grpc/ForwardingChannelBuilder.java | 6 ++ .../java/io/grpc/ManagedChannelBuilder.java | 17 ++++ api/src/main/java/io/grpc/NameResolver.java | 39 +++++++- .../test/java/io/grpc/NameResolverTest.java | 6 ++ .../AbstractManagedChannelImplBuilder.java | 12 +++ .../io/grpc/internal/DnsNameResolver.java | 14 ++- .../io/grpc/internal/ManagedChannelImpl.java | 34 +++++-- ...AbstractManagedChannelImplBuilderTest.java | 15 +++ .../io/grpc/internal/DnsNameResolverTest.java | 99 +++++++++++++++---- .../grpc/internal/ManagedChannelImplTest.java | 24 ++++- 10 files changed, 229 insertions(+), 37 deletions(-) diff --git a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java index db63180f60d..4ef944c7ab9 100644 --- a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java +++ b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java @@ -71,6 +71,12 @@ public T executor(Executor executor) { return thisT(); } + @Override + public T blockingExecutor(Executor executor) { + delegate().blockingExecutor(executor); + return thisT(); + } + @Override public T intercept(List interceptors) { delegate().intercept(interceptors); diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java index bd07f045b62..e2706e2ec51 100644 --- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -103,6 +103,23 @@ public static ManagedChannelBuilder forTarget(String target) { */ public abstract T executor(Executor executor); + /** + * Provides a custom executor that will be used for operations that block. + * + *

It's an optional parameter. If the user has not provided an executor when the channel is + * built, the builder will use a static cached thread pool. + * + *

The channel won't take ownership of the given executor. It's caller's responsibility to shut + * down the executor when it's desired. + * + * @return this + * @throws UnsupportedOperationException if unsupported + * @since 1.25.0 + */ + public T blockingExecutor(Executor executor) { + throw new UnsupportedOperationException(); + } + /** * Adds interceptors that will be called before the channel performs its real work. This is * functionally equivalent to using {@link ClientInterceptors#intercept(Channel, List)}, but while diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index c96f330c61d..d8c01b6a7c3 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -411,13 +412,19 @@ public static final class Args { private final ProxyDetector proxyDetector; private final SynchronizationContext syncContext; private final ServiceConfigParser serviceConfigParser; - - Args(Integer defaultPort, ProxyDetector proxyDetector, - SynchronizationContext syncContext, ServiceConfigParser serviceConfigParser) { + @Nullable private final Executor executor; + + Args( + Integer defaultPort, + ProxyDetector proxyDetector, + SynchronizationContext syncContext, + ServiceConfigParser serviceConfigParser, + @Nullable Executor executor) { this.defaultPort = checkNotNull(defaultPort, "defaultPort not set"); this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set"); this.syncContext = checkNotNull(syncContext, "syncContext not set"); this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set"); + this.executor = executor; } /** @@ -459,6 +466,17 @@ public ServiceConfigParser getServiceConfigParser() { return serviceConfigParser; } + /** + * Returns the Executor on which this resolver should execute long-running or I/O bound work. + * Null if no Executor was set. + * + * @since 1.25.0 + */ + @Nullable + public Executor getBlockingExecutor() { + return executor; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -466,6 +484,7 @@ public String toString() { .add("proxyDetector", proxyDetector) .add("syncContext", syncContext) .add("serviceConfigParser", serviceConfigParser) + .add("executor", executor) .toString(); } @@ -480,6 +499,7 @@ public Builder toBuilder() { builder.setProxyDetector(proxyDetector); builder.setSynchronizationContext(syncContext); builder.setServiceConfigParser(serviceConfigParser); + builder.setBlockingExecutor(executor); return builder; } @@ -502,6 +522,7 @@ public static final class Builder { private ProxyDetector proxyDetector; private SynchronizationContext syncContext; private ServiceConfigParser serviceConfigParser; + private Executor executor; Builder() { } @@ -546,13 +567,23 @@ public Builder setServiceConfigParser(ServiceConfigParser parser) { return this; } + /** + * See {@link Args#getBlockingExecutor}. This is an optional field. + * + * @since 1.25.0 + */ + public Builder setBlockingExecutor(Executor executor) { + this.executor = executor; + return this; + } + /** * Builds an {@link Args}. * * @since 1.21.0 */ public Args build() { - return new Args(defaultPort, proxyDetector, syncContext, serviceConfigParser); + return new Args(defaultPort, proxyDetector, syncContext, serviceConfigParser, executor); } } } diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java index b641fcac682..09e1e1c20f6 100644 --- a/api/src/test/java/io/grpc/NameResolverTest.java +++ b/api/src/test/java/io/grpc/NameResolverTest.java @@ -29,6 +29,8 @@ import java.net.URI; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -43,6 +45,7 @@ public class NameResolverTest { private final SynchronizationContext syncContext = new SynchronizationContext(mock(UncaughtExceptionHandler.class)); private final ServiceConfigParser parser = mock(ServiceConfigParser.class); + private final Executor executor = Executors.newSingleThreadExecutor(); private URI uri; private final NameResolver nameResolver = mock(NameResolver.class); @@ -58,12 +61,14 @@ public void args() { assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args.getBlockingExecutor()).isSameInstanceAs(executor); NameResolver.Args args2 = args.toBuilder().build(); assertThat(args2.getDefaultPort()).isEqualTo(defaultPort); assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args2.getBlockingExecutor()).isSameInstanceAs(executor); assertThat(args2).isNotSameInstanceAs(args); assertThat(args2).isNotEqualTo(args); @@ -246,6 +251,7 @@ private NameResolver.Args createArgs() { .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) .setServiceConfigParser(parser) + .setBlockingExecutor(executor) .build(); } } diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 14417b6b26f..3698eda6fd1 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -95,6 +95,8 @@ public static ManagedChannelBuilder forTarget(String target) { ObjectPool executorPool = DEFAULT_EXECUTOR_POOL; + ObjectPool blockingExecutorPool = DEFAULT_EXECUTOR_POOL; + private final List interceptors = new ArrayList<>(); final NameResolverRegistry nameResolverRegistry = NameResolverRegistry.getDefaultRegistry(); @@ -217,6 +219,16 @@ public final T executor(Executor executor) { return thisT(); } + @Override + public final T blockingExecutor(Executor executor) { + if (executor != null) { + this.blockingExecutorPool = new FixedObjectPool<>(executor); + } else { + this.blockingExecutorPool = DEFAULT_EXECUTOR_POOL; + } + return thisT(); + } + @Override public final T intercept(List interceptors) { this.interceptors.addAll(interceptors); diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index 0da09299afa..8e07dbdb8ec 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -138,6 +138,8 @@ final class DnsNameResolver extends NameResolver { private final String authority; private final String host; private final int port; + + /** Executor that will be used if an Executor is not provide via {@link NameResolver.Args}. */ private final Resource executorResource; private final long cacheTtlNanos; private final SynchronizationContext syncContext; @@ -147,6 +149,10 @@ final class DnsNameResolver extends NameResolver { private ResolutionResults cachedResolutionResults; private boolean shutdown; private Executor executor; + + /** True if using an executor resource that should be released after use. */ + private final boolean usingExecutorResource; + private boolean resolving; // The field must be accessed from syncContext, although the methods on an Listener2 can be called @@ -176,6 +182,8 @@ final class DnsNameResolver extends NameResolver { this.stopwatch = Preconditions.checkNotNull(stopwatch, "stopwatch"); this.syncContext = Preconditions.checkNotNull(args.getSynchronizationContext(), "syncContext"); + this.executor = args.getBlockingExecutor(); + this.usingExecutorResource = executor == null; } @Override @@ -186,7 +194,9 @@ public String getServiceAuthority() { @Override public void start(Listener2 listener) { Preconditions.checkState(this.listener == null, "already started"); - executor = SharedResourceHolder.get(executorResource); + if (usingExecutorResource) { + executor = SharedResourceHolder.get(executorResource); + } this.listener = Preconditions.checkNotNull(listener, "listener"); resolve(); } @@ -361,7 +371,7 @@ public void shutdown() { return; } shutdown = true; - if (executor != null) { + if (executor != null && usingExecutorResource) { executor = SharedResourceHolder.release(executorResource, executor); } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 7e6ca957057..fed1d660bc9 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -142,6 +142,7 @@ final class ManagedChannelImpl extends ManagedChannel implements private final ObjectPool executorPool; private final ObjectPool balancerRpcExecutorPool; private final ExecutorHolder balancerRpcExecutorHolder; + private final ExecutorHolder blockingExecutorHolder; private final TimeProvider timeProvider; private final int maxTraceEvents; @@ -565,16 +566,30 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR; this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy); + this.blockingExecutorHolder = + new ExecutorHolder( + checkNotNull(builder.blockingExecutorPool, "blockingExecutorPool")); this.nameResolverRegistry = builder.nameResolverRegistry; - this.nameResolverArgs = NameResolver.Args.newBuilder() - .setDefaultPort(builder.getDefaultPort()) - .setProxyDetector(proxyDetector) - .setSynchronizationContext(syncContext) - .setServiceConfigParser( - new ScParser( - retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts, - loadBalancerFactory)) - .build(); + this.nameResolverArgs = + NameResolver.Args.newBuilder() + .setDefaultPort(builder.getDefaultPort()) + .setProxyDetector(proxyDetector) + .setSynchronizationContext(syncContext) + .setServiceConfigParser( + new ScParser( + retryEnabled, + builder.maxRetryAttempts, + builder.maxHedgedAttempts, + loadBalancerFactory)) + .setBlockingExecutor( + // Avoid creating the blockingExecutor until it is first used + new Executor() { + @Override + public void execute(Runnable command) { + blockingExecutorHolder.getExecutor().execute(command); + } + }) + .build(); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs); this.timeProvider = checkNotNull(timeProvider, "timeProvider"); maxTraceEvents = builder.maxTraceEvents; @@ -885,6 +900,7 @@ private void maybeTerminateChannel() { terminatedLatch.countDown(); executorPool.returnObject(executor); balancerRpcExecutorHolder.release(); + blockingExecutorHolder.release(); // Release the transport factory so that it can deallocate any resources. transportFactory.close(); } diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index 4b4adcc7e55..fee1685f8a2 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -99,6 +99,21 @@ public void directExecutor() { assertEquals(MoreExecutors.directExecutor(), builder.executorPool.getObject()); } + @Test + public void blockingExecutor_normal() { + Executor executor = mock(Executor.class); + assertEquals(builder, builder.blockingExecutor(executor)); + assertEquals(executor, builder.blockingExecutorPool.getObject()); + } + + @Test + public void blockingExecutor_null() { + ObjectPool defaultValue = builder.blockingExecutorPool; + builder.blockingExecutor(mock(Executor.class)); + assertEquals(builder, builder.blockingExecutor(null)); + assertEquals(defaultValue, builder.blockingExecutorPool); + } + @Test public void nameResolverFactory_default() { assertNotNull(builder.getNameResolverFactory()); diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java index 361ee69de3c..7214db6edac 100644 --- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java @@ -70,6 +70,7 @@ import java.util.Random; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; @@ -120,17 +121,20 @@ public void uncaughtException(Thread t, Throwable e) { private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeExecutor = new FakeClock(); - private final Resource fakeExecutorResource = - new Resource() { - @Override - public Executor create() { - return fakeExecutor.getScheduledExecutorService(); - } + private final FakeExecutorResource fakeExecutorResource = new FakeExecutorResource(); - @Override - public void close(Executor instance) { - } - }; + private final class FakeExecutorResource implements Resource { + private final AtomicInteger createCount = new AtomicInteger(); + + @Override + public Executor create() { + createCount.incrementAndGet(); + return fakeExecutor.getScheduledExecutorService(); + } + + @Override + public void close(Executor instance) {} + } @Mock private NameResolver.Listener2 mockListener; @@ -165,18 +169,20 @@ private DnsNameResolver newResolver( final ProxyDetector proxyDetector, Stopwatch stopwatch, boolean isAndroid) { - DnsNameResolver dnsResolver = new DnsNameResolver( - null, - name, + NameResolver.Args args = NameResolver.Args.newBuilder() .setDefaultPort(defaultPort) .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) .setServiceConfigParser(mock(ServiceConfigParser.class)) - .build(), - fakeExecutorResource, - stopwatch, - isAndroid); + .build(); + return newResolver(name, stopwatch, isAndroid, args); + } + + private DnsNameResolver newResolver( + String name, Stopwatch stopwatch, boolean isAndroid, NameResolver.Args args) { + DnsNameResolver dnsResolver = + new DnsNameResolver(null, name, args, fakeExecutorResource, stopwatch, isAndroid); // By default, using the mocked ResourceResolver to avoid I/O dnsResolver.setResourceResolver(new JndiResourceResolver(recordFetcher)); return dnsResolver; @@ -293,6 +299,65 @@ private void resolveNeverCache(boolean isAndroid) throws Exception { verify(mockResolver, times(2)).resolveAddress(anyString()); } + @Test + public void testExecutor_default() throws Exception { + final List answer = createAddressList(2); + + DnsNameResolver resolver = newResolver("foo.googleapis.com", 81); + AddressResolver mockResolver = mock(AddressResolver.class); + when(mockResolver.resolveAddress(anyString())).thenReturn(answer); + resolver.setAddressResolver(mockResolver); + + resolver.start(mockListener); + assertEquals(1, fakeExecutor.runDueTasks()); + verify(mockListener).onResult(resultCaptor.capture()); + assertAnswerMatches(answer, 81, resultCaptor.getValue()); + assertEquals(0, fakeClock.numPendingTasks()); + + resolver.shutdown(); + + assertThat(fakeExecutorResource.createCount.get()).isEqualTo(1); + } + + @Test + public void testExecutor_custom() throws Exception { + final List answer = createAddressList(2); + final AtomicInteger executions = new AtomicInteger(); + + NameResolver.Args args = + NameResolver.Args.newBuilder() + .setDefaultPort(81) + .setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR) + .setSynchronizationContext(syncContext) + .setServiceConfigParser(mock(ServiceConfigParser.class)) + .setBlockingExecutor( + new Executor() { + @Override + public void execute(Runnable command) { + executions.incrementAndGet(); + command.run(); + } + }) + .build(); + + DnsNameResolver resolver = + newResolver("foo.googleapis.com", Stopwatch.createUnstarted(), false, args); + AddressResolver mockResolver = mock(AddressResolver.class); + when(mockResolver.resolveAddress(anyString())).thenReturn(answer); + resolver.setAddressResolver(mockResolver); + + resolver.start(mockListener); + assertEquals(0, fakeExecutor.runDueTasks()); + verify(mockListener).onResult(resultCaptor.capture()); + assertAnswerMatches(answer, 81, resultCaptor.getValue()); + assertEquals(0, fakeClock.numPendingTasks()); + + resolver.shutdown(); + + assertThat(fakeExecutorResource.createCount.get()).isEqualTo(0); + assertThat(executions.get()).isEqualTo(1); + } + @Test public void resolveAll_failsOnEmptyResult() { DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 3fb9dfea0bc..b2eb58dc108 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -265,6 +265,8 @@ public String getPolicyName() { private ObjectPool balancerRpcExecutorPool; @Mock private CallCredentials creds; + @Mock + private Executor blockingExecutor; private ChannelBuilder channelBuilder; private boolean requestConnection = true; private BlockingQueue transports; @@ -319,11 +321,14 @@ public void setUp() throws Exception { when(balancerRpcExecutorPool.getObject()) .thenReturn(balancerRpcExecutor.getScheduledExecutorService()); - channelBuilder = new ChannelBuilder() - .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) - .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) - .userAgent(USER_AGENT) - .idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS); + channelBuilder = + new ChannelBuilder() + .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) + .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) + .userAgent(USER_AGENT) + .idleTimeout( + AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS) + .blockingExecutor(blockingExecutor); channelBuilder.executorPool = executorPool; channelBuilder.binlog = null; channelBuilder.channelz = channelz; @@ -3582,6 +3587,15 @@ public String getDefaultScheme() { assertThat(args).isNotNull(); assertThat(args.getDefaultPort()).isEqualTo(DEFAULT_PORT); assertThat(args.getProxyDetector()).isSameInstanceAs(neverProxy); + + verify(blockingExecutor, never()).execute(any(Runnable.class)); + args.getBlockingExecutor() + .execute( + new Runnable() { + @Override + public void run() {} + }); + verify(blockingExecutor, times(1)).execute(any(Runnable.class)); } @Test