From 7f6909d4ae6f621e8ef66b907243181004b8cf97 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Thu, 21 Nov 2019 16:14:13 -0800 Subject: [PATCH] api, core: make scheduled executor service accessible for NameResolver.Args (#6455) Added new API on NameResolver.Args to access ScheduledExecutorService, which is wrapped transport executor. --- api/src/main/java/io/grpc/NameResolver.java | 38 ++++++++++++++++++- .../test/java/io/grpc/NameResolverTest.java | 6 +++ .../io/grpc/internal/ManagedChannelImpl.java | 22 +++++------ 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index fd7302658c5f..2a66ef6770aa 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -412,6 +413,7 @@ public static final class Args { private final ProxyDetector proxyDetector; private final SynchronizationContext syncContext; private final ServiceConfigParser serviceConfigParser; + @Nullable private final ScheduledExecutorService scheduledExecutorService; @Nullable private final ChannelLogger channelLogger; @Nullable private final Executor executor; @@ -420,12 +422,14 @@ private Args( ProxyDetector proxyDetector, SynchronizationContext syncContext, ServiceConfigParser serviceConfigParser, + @Nullable ScheduledExecutorService scheduledExecutorService, @Nullable ChannelLogger channelLogger, @Nullable Executor executor) { this.defaultPort = checkNotNull(defaultPort, "defaultPort not set"); this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set"); this.syncContext = checkNotNull(syncContext, "syncContext not set"); this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set"); + this.scheduledExecutorService = scheduledExecutorService; this.channelLogger = channelLogger; this.executor = executor; } @@ -460,6 +464,25 @@ public SynchronizationContext getSynchronizationContext() { return syncContext; } + /** + * Returns a {@link ScheduledExecutorService} for scheduling delayed tasks. + * + *

This service is a shared resource and is only meant for quick tasks. DO NOT block or run + * time-consuming tasks. + * + *

The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()} + * and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called. + * + * @since 1.26.0 + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454") + public ScheduledExecutorService getScheduledExecutorService() { + if (scheduledExecutorService == null) { + throw new IllegalStateException("ScheduledExecutorService not set in Builder"); + } + return scheduledExecutorService; + } + /** * Returns the {@link ServiceConfigParser}. * @@ -501,6 +524,7 @@ public String toString() { .add("proxyDetector", proxyDetector) .add("syncContext", syncContext) .add("serviceConfigParser", serviceConfigParser) + .add("scheduledExecutorService", scheduledExecutorService) .add("channelLogger", channelLogger) .add("executor", executor) .toString(); @@ -517,6 +541,7 @@ public Builder toBuilder() { builder.setProxyDetector(proxyDetector); builder.setSynchronizationContext(syncContext); builder.setServiceConfigParser(serviceConfigParser); + builder.setScheduledExecutorService(scheduledExecutorService); builder.setChannelLogger(channelLogger); builder.setOffloadExecutor(executor); return builder; @@ -541,6 +566,7 @@ public static final class Builder { private ProxyDetector proxyDetector; private SynchronizationContext syncContext; private ServiceConfigParser serviceConfigParser; + private ScheduledExecutorService scheduledExecutorService; private ChannelLogger channelLogger; private Executor executor; @@ -577,6 +603,16 @@ public Builder setSynchronizationContext(SynchronizationContext syncContext) { return this; } + /** + * See {@link Args#getScheduledExecutorService}. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454") + public Builder setScheduledExecutorService( + ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = checkNotNull(scheduledExecutorService); + return this; + } + /** * See {@link Args#getServiceConfigParser}. This is a required field. * @@ -618,7 +654,7 @@ public Args build() { return new Args( defaultPort, proxyDetector, syncContext, serviceConfigParser, - channelLogger, executor); + scheduledExecutorService, channelLogger, executor); } } } diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java index 828047443a4e..0924002fd289 100644 --- a/api/src/test/java/io/grpc/NameResolverTest.java +++ b/api/src/test/java/io/grpc/NameResolverTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -45,6 +46,8 @@ public class NameResolverTest { private final SynchronizationContext syncContext = new SynchronizationContext(mock(UncaughtExceptionHandler.class)); private final ServiceConfigParser parser = mock(ServiceConfigParser.class); + private final ScheduledExecutorService scheduledExecutorService = + mock(ScheduledExecutorService.class); private final ChannelLogger channelLogger = mock(ChannelLogger.class); private final Executor executor = Executors.newSingleThreadExecutor(); private URI uri; @@ -62,6 +65,7 @@ public void args() { assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService); assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor); @@ -70,6 +74,7 @@ public void args() { assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser); + assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService); assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor); @@ -254,6 +259,7 @@ private NameResolver.Args createArgs() { .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) .setServiceConfigParser(parser) + .setScheduledExecutorService(scheduledExecutorService) .setChannelLogger(channelLogger) .setOffloadExecutor(executor) .build(); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index de3a472501c6..b21faa322d33 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements private final NameResolver.Args nameResolverArgs; private final AutoConfiguredLoadBalancerFactory loadBalancerFactory; private final ClientTransportFactory transportFactory; - private final ScheduledExecutorForBalancer scheduledExecutorForBalancer; + private final RestrictedScheduledExecutor scheduledExecutor; private final Executor executor; private final ObjectPool executorPool; private final ObjectPool balancerRpcExecutorPool; @@ -562,6 +562,12 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new this.target = checkNotNull(builder.target, "target"); this.logId = InternalLogId.allocate("Channel", target); this.timeProvider = checkNotNull(timeProvider, "timeProvider"); + this.executorPool = checkNotNull(builder.executorPool, "executorPool"); + this.executor = checkNotNull(executorPool.getObject(), "executor"); + this.transportFactory = + new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); + this.scheduledExecutor = + new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService()); maxTraceEvents = builder.maxTraceEvents; channelTracer = new ChannelTracer( logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(), @@ -581,6 +587,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new .setDefaultPort(builder.getDefaultPort()) .setProxyDetector(proxyDetector) .setSynchronizationContext(syncContext) + .setScheduledExecutorService(scheduledExecutor) .setServiceConfigParser( new ScParser( retryEnabled, @@ -598,18 +605,11 @@ public void execute(Runnable command) { }) .build(); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs); - this.executorPool = checkNotNull(builder.executorPool, "executorPool"); this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"); this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool); - this.executor = checkNotNull(executorPool.getObject(), "executor"); this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext); this.delayedTransport.start(delayedTransportListener); this.backoffPolicyProvider = backoffPolicyProvider; - this.transportFactory = - new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); - this.scheduledExecutorForBalancer = - new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService()); - serviceConfigInterceptor = new ServiceConfigInterceptor( retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts); this.defaultServiceConfig = builder.defaultServiceConfig; @@ -1269,7 +1269,7 @@ public SynchronizationContext getSynchronizationContext() { @Override public ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorForBalancer; + return scheduledExecutor; } @Override @@ -1736,10 +1736,10 @@ synchronized void release() { } } - private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService { + private static final class RestrictedScheduledExecutor implements ScheduledExecutorService { final ScheduledExecutorService delegate; - private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) { + private RestrictedScheduledExecutor(ScheduledExecutorService delegate) { this.delegate = checkNotNull(delegate, "delegate"); }