diff --git a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java index db63180f60d..4ef944c7ab9 100644 --- a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java +++ b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java @@ -71,6 +71,12 @@ public T executor(Executor executor) { return thisT(); } + @Override + public T blockingExecutor(Executor executor) { + delegate().blockingExecutor(executor); + return thisT(); + } + @Override public T intercept(List interceptors) { delegate().intercept(interceptors); diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java index bd07f045b62..e2706e2ec51 100644 --- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -103,6 +103,23 @@ public static ManagedChannelBuilder forTarget(String target) { */ public abstract T executor(Executor executor); + /** + * Provides a custom executor that will be used for operations that block. + * + *

It's an optional parameter. If the user has not provided an executor when the channel is + * built, the builder will use a static cached thread pool. + * + *

The channel won't take ownership of the given executor. It's caller's responsibility to shut + * down the executor when it's desired. + * + * @return this + * @throws UnsupportedOperationException if unsupported + * @since 1.25.0 + */ + public T blockingExecutor(Executor executor) { + throw new UnsupportedOperationException(); + } + /** * Adds interceptors that will be called before the channel performs its real work. This is * functionally equivalent to using {@link ClientInterceptors#intercept(Channel, List)}, but while diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index c96f330c61d..d8c01b6a7c3 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -411,13 +412,19 @@ public static final class Args { private final ProxyDetector proxyDetector; private final SynchronizationContext syncContext; private final ServiceConfigParser serviceConfigParser; - - Args(Integer defaultPort, ProxyDetector proxyDetector, - SynchronizationContext syncContext, ServiceConfigParser serviceConfigParser) { + @Nullable private final Executor executor; + + Args( + Integer defaultPort, + ProxyDetector proxyDetector, + SynchronizationContext syncContext, + ServiceConfigParser serviceConfigParser, + @Nullable Executor executor) { this.defaultPort = checkNotNull(defaultPort, "defaultPort not set"); this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set"); this.syncContext = checkNotNull(syncContext, "syncContext not set"); this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set"); + this.executor = executor; } /** @@ -459,6 +466,17 @@ public ServiceConfigParser getServiceConfigParser() { return serviceConfigParser; } + /** + * Returns the Executor on which this resolver should execute long-running or I/O bound work. + * Null if no Executor was set. + * + * @since 1.25.0 + */ + @Nullable + public Executor getBlockingExecutor() { + return executor; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -466,6 +484,7 @@ public String toString() { .add("proxyDetector", proxyDetector) .add("syncContext", syncContext) .add("serviceConfigParser", serviceConfigParser) + .add("executor", executor) .toString(); } @@ -480,6 +499,7 @@ public Builder toBuilder() { builder.setProxyDetector(proxyDetector); builder.setSynchronizationContext(syncContext); builder.setServiceConfigParser(serviceConfigParser); + builder.setBlockingExecutor(executor); return builder; } @@ -502,6 +522,7 @@ public static final class Builder { private ProxyDetector proxyDetector; private SynchronizationContext syncContext; private ServiceConfigParser serviceConfigParser; + private Executor executor; Builder() { } @@ -546,13 +567,23 @@ public Builder setServiceConfigParser(ServiceConfigParser parser) { return this; } + /** + * See {@link Args#getBlockingExecutor}. This is an optional field. + * + * @since 1.25.0 + */ + public Builder setBlockingExecutor(Executor executor) { + this.executor = executor; + return this; + } + /** * Builds an {@link Args}. * * @since 1.21.0 */ public Args build() { - return new Args(defaultPort, proxyDetector, syncContext, serviceConfigParser); + return new Args(defaultPort, proxyDetector, syncContext, serviceConfigParser, executor); } } } diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java index b641fcac682..09e1e1c20f6 100644 --- a/api/src/test/java/io/grpc/NameResolverTest.java +++ b/api/src/test/java/io/grpc/NameResolverTest.java @@ -29,6 +29,8 @@ import java.net.URI; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -43,6 +45,7 @@ public class NameResolverTest { private final SynchronizationContext syncContext = new SynchronizationContext(mock(UncaughtExceptionHandler.class)); private final ServiceConfigParser parser = mock(ServiceConfigParser.class); + private final Executor executor = Executors.newSingleThreadExecutor(); private URI uri; private final NameResolver nameResolver = mock(NameResolver.class); @@ -58,12 +61,14 @@ public void args() { assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args.getBlockingExecutor()).isSameInstanceAs(executor); NameResolver.Args args2 = args.toBuilder().build(); assertThat(args2.getDefaultPort()).isEqualTo(defaultPort); assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args2.getBlockingExecutor()).isSameInstanceAs(executor); assertThat(args2).isNotSameInstanceAs(args); assertThat(args2).isNotEqualTo(args); @@ -246,6 +251,7 @@ private NameResolver.Args createArgs() { .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) .setServiceConfigParser(parser) + .setBlockingExecutor(executor) .build(); } } diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 14417b6b26f..3698eda6fd1 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -95,6 +95,8 @@ public static ManagedChannelBuilder forTarget(String target) { ObjectPool executorPool = DEFAULT_EXECUTOR_POOL; + ObjectPool blockingExecutorPool = DEFAULT_EXECUTOR_POOL; + private final List interceptors = new ArrayList<>(); final NameResolverRegistry nameResolverRegistry = NameResolverRegistry.getDefaultRegistry(); @@ -217,6 +219,16 @@ public final T executor(Executor executor) { return thisT(); } + @Override + public final T blockingExecutor(Executor executor) { + if (executor != null) { + this.blockingExecutorPool = new FixedObjectPool<>(executor); + } else { + this.blockingExecutorPool = DEFAULT_EXECUTOR_POOL; + } + return thisT(); + } + @Override public final T intercept(List interceptors) { this.interceptors.addAll(interceptors); diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index 0da09299afa..8e07dbdb8ec 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -138,6 +138,8 @@ final class DnsNameResolver extends NameResolver { private final String authority; private final String host; private final int port; + + /** Executor that will be used if an Executor is not provide via {@link NameResolver.Args}. */ private final Resource executorResource; private final long cacheTtlNanos; private final SynchronizationContext syncContext; @@ -147,6 +149,10 @@ final class DnsNameResolver extends NameResolver { private ResolutionResults cachedResolutionResults; private boolean shutdown; private Executor executor; + + /** True if using an executor resource that should be released after use. */ + private final boolean usingExecutorResource; + private boolean resolving; // The field must be accessed from syncContext, although the methods on an Listener2 can be called @@ -176,6 +182,8 @@ final class DnsNameResolver extends NameResolver { this.stopwatch = Preconditions.checkNotNull(stopwatch, "stopwatch"); this.syncContext = Preconditions.checkNotNull(args.getSynchronizationContext(), "syncContext"); + this.executor = args.getBlockingExecutor(); + this.usingExecutorResource = executor == null; } @Override @@ -186,7 +194,9 @@ public String getServiceAuthority() { @Override public void start(Listener2 listener) { Preconditions.checkState(this.listener == null, "already started"); - executor = SharedResourceHolder.get(executorResource); + if (usingExecutorResource) { + executor = SharedResourceHolder.get(executorResource); + } this.listener = Preconditions.checkNotNull(listener, "listener"); resolve(); } @@ -361,7 +371,7 @@ public void shutdown() { return; } shutdown = true; - if (executor != null) { + if (executor != null && usingExecutorResource) { executor = SharedResourceHolder.release(executorResource, executor); } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 7e6ca957057..fed1d660bc9 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -142,6 +142,7 @@ final class ManagedChannelImpl extends ManagedChannel implements private final ObjectPool executorPool; private final ObjectPool balancerRpcExecutorPool; private final ExecutorHolder balancerRpcExecutorHolder; + private final ExecutorHolder blockingExecutorHolder; private final TimeProvider timeProvider; private final int maxTraceEvents; @@ -565,16 +566,30 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR; this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy); + this.blockingExecutorHolder = + new ExecutorHolder( + checkNotNull(builder.blockingExecutorPool, "blockingExecutorPool")); this.nameResolverRegistry = builder.nameResolverRegistry; - this.nameResolverArgs = NameResolver.Args.newBuilder() - .setDefaultPort(builder.getDefaultPort()) - .setProxyDetector(proxyDetector) - .setSynchronizationContext(syncContext) - .setServiceConfigParser( - new ScParser( - retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts, - loadBalancerFactory)) - .build(); + this.nameResolverArgs = + NameResolver.Args.newBuilder() + .setDefaultPort(builder.getDefaultPort()) + .setProxyDetector(proxyDetector) + .setSynchronizationContext(syncContext) + .setServiceConfigParser( + new ScParser( + retryEnabled, + builder.maxRetryAttempts, + builder.maxHedgedAttempts, + loadBalancerFactory)) + .setBlockingExecutor( + // Avoid creating the blockingExecutor until it is first used + new Executor() { + @Override + public void execute(Runnable command) { + blockingExecutorHolder.getExecutor().execute(command); + } + }) + .build(); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs); this.timeProvider = checkNotNull(timeProvider, "timeProvider"); maxTraceEvents = builder.maxTraceEvents; @@ -885,6 +900,7 @@ private void maybeTerminateChannel() { terminatedLatch.countDown(); executorPool.returnObject(executor); balancerRpcExecutorHolder.release(); + blockingExecutorHolder.release(); // Release the transport factory so that it can deallocate any resources. transportFactory.close(); } diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index 4b4adcc7e55..fee1685f8a2 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -99,6 +99,21 @@ public void directExecutor() { assertEquals(MoreExecutors.directExecutor(), builder.executorPool.getObject()); } + @Test + public void blockingExecutor_normal() { + Executor executor = mock(Executor.class); + assertEquals(builder, builder.blockingExecutor(executor)); + assertEquals(executor, builder.blockingExecutorPool.getObject()); + } + + @Test + public void blockingExecutor_null() { + ObjectPool defaultValue = builder.blockingExecutorPool; + builder.blockingExecutor(mock(Executor.class)); + assertEquals(builder, builder.blockingExecutor(null)); + assertEquals(defaultValue, builder.blockingExecutorPool); + } + @Test public void nameResolverFactory_default() { assertNotNull(builder.getNameResolverFactory()); diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java index 361ee69de3c..7214db6edac 100644 --- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java @@ -70,6 +70,7 @@ import java.util.Random; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; @@ -120,17 +121,20 @@ public void uncaughtException(Thread t, Throwable e) { private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeExecutor = new FakeClock(); - private final Resource fakeExecutorResource = - new Resource() { - @Override - public Executor create() { - return fakeExecutor.getScheduledExecutorService(); - } + private final FakeExecutorResource fakeExecutorResource = new FakeExecutorResource(); - @Override - public void close(Executor instance) { - } - }; + private final class FakeExecutorResource implements Resource { + private final AtomicInteger createCount = new AtomicInteger(); + + @Override + public Executor create() { + createCount.incrementAndGet(); + return fakeExecutor.getScheduledExecutorService(); + } + + @Override + public void close(Executor instance) {} + } @Mock private NameResolver.Listener2 mockListener; @@ -165,18 +169,20 @@ private DnsNameResolver newResolver( final ProxyDetector proxyDetector, Stopwatch stopwatch, boolean isAndroid) { - DnsNameResolver dnsResolver = new DnsNameResolver( - null, - name, + NameResolver.Args args = NameResolver.Args.newBuilder() .setDefaultPort(defaultPort) .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) .setServiceConfigParser(mock(ServiceConfigParser.class)) - .build(), - fakeExecutorResource, - stopwatch, - isAndroid); + .build(); + return newResolver(name, stopwatch, isAndroid, args); + } + + private DnsNameResolver newResolver( + String name, Stopwatch stopwatch, boolean isAndroid, NameResolver.Args args) { + DnsNameResolver dnsResolver = + new DnsNameResolver(null, name, args, fakeExecutorResource, stopwatch, isAndroid); // By default, using the mocked ResourceResolver to avoid I/O dnsResolver.setResourceResolver(new JndiResourceResolver(recordFetcher)); return dnsResolver; @@ -293,6 +299,65 @@ private void resolveNeverCache(boolean isAndroid) throws Exception { verify(mockResolver, times(2)).resolveAddress(anyString()); } + @Test + public void testExecutor_default() throws Exception { + final List answer = createAddressList(2); + + DnsNameResolver resolver = newResolver("foo.googleapis.com", 81); + AddressResolver mockResolver = mock(AddressResolver.class); + when(mockResolver.resolveAddress(anyString())).thenReturn(answer); + resolver.setAddressResolver(mockResolver); + + resolver.start(mockListener); + assertEquals(1, fakeExecutor.runDueTasks()); + verify(mockListener).onResult(resultCaptor.capture()); + assertAnswerMatches(answer, 81, resultCaptor.getValue()); + assertEquals(0, fakeClock.numPendingTasks()); + + resolver.shutdown(); + + assertThat(fakeExecutorResource.createCount.get()).isEqualTo(1); + } + + @Test + public void testExecutor_custom() throws Exception { + final List answer = createAddressList(2); + final AtomicInteger executions = new AtomicInteger(); + + NameResolver.Args args = + NameResolver.Args.newBuilder() + .setDefaultPort(81) + .setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR) + .setSynchronizationContext(syncContext) + .setServiceConfigParser(mock(ServiceConfigParser.class)) + .setBlockingExecutor( + new Executor() { + @Override + public void execute(Runnable command) { + executions.incrementAndGet(); + command.run(); + } + }) + .build(); + + DnsNameResolver resolver = + newResolver("foo.googleapis.com", Stopwatch.createUnstarted(), false, args); + AddressResolver mockResolver = mock(AddressResolver.class); + when(mockResolver.resolveAddress(anyString())).thenReturn(answer); + resolver.setAddressResolver(mockResolver); + + resolver.start(mockListener); + assertEquals(0, fakeExecutor.runDueTasks()); + verify(mockListener).onResult(resultCaptor.capture()); + assertAnswerMatches(answer, 81, resultCaptor.getValue()); + assertEquals(0, fakeClock.numPendingTasks()); + + resolver.shutdown(); + + assertThat(fakeExecutorResource.createCount.get()).isEqualTo(0); + assertThat(executions.get()).isEqualTo(1); + } + @Test public void resolveAll_failsOnEmptyResult() { DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 3fb9dfea0bc..b2eb58dc108 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -265,6 +265,8 @@ public String getPolicyName() { private ObjectPool balancerRpcExecutorPool; @Mock private CallCredentials creds; + @Mock + private Executor blockingExecutor; private ChannelBuilder channelBuilder; private boolean requestConnection = true; private BlockingQueue transports; @@ -319,11 +321,14 @@ public void setUp() throws Exception { when(balancerRpcExecutorPool.getObject()) .thenReturn(balancerRpcExecutor.getScheduledExecutorService()); - channelBuilder = new ChannelBuilder() - .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) - .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) - .userAgent(USER_AGENT) - .idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS); + channelBuilder = + new ChannelBuilder() + .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build()) + .defaultLoadBalancingPolicy(MOCK_POLICY_NAME) + .userAgent(USER_AGENT) + .idleTimeout( + AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS) + .blockingExecutor(blockingExecutor); channelBuilder.executorPool = executorPool; channelBuilder.binlog = null; channelBuilder.channelz = channelz; @@ -3582,6 +3587,15 @@ public String getDefaultScheme() { assertThat(args).isNotNull(); assertThat(args.getDefaultPort()).isEqualTo(DEFAULT_PORT); assertThat(args.getProxyDetector()).isSameInstanceAs(neverProxy); + + verify(blockingExecutor, never()).execute(any(Runnable.class)); + args.getBlockingExecutor() + .execute( + new Runnable() { + @Override + public void run() {} + }); + verify(blockingExecutor, times(1)).execute(any(Runnable.class)); } @Test