Skip to content

Commit

Permalink
api, core: make scheduled executor service accessible for NameResolve…
Browse files Browse the repository at this point in the history
…r.Args (grpc#6455)

Added new API on NameResolver.Args to access ScheduledExecutorService, which is wrapped transport executor.
  • Loading branch information
voidzcy authored and ericgribkoff committed Dec 6, 2019
1 parent 24e731a commit 7f6909d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
38 changes: 37 additions & 1 deletion api/src/main/java/io/grpc/NameResolver.java
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -412,6 +413,7 @@ public static final class Args {
private final ProxyDetector proxyDetector;
private final SynchronizationContext syncContext;
private final ServiceConfigParser serviceConfigParser;
@Nullable private final ScheduledExecutorService scheduledExecutorService;
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;

Expand All @@ -420,12 +422,14 @@ private Args(
ProxyDetector proxyDetector,
SynchronizationContext syncContext,
ServiceConfigParser serviceConfigParser,
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set");
this.scheduledExecutorService = scheduledExecutorService;
this.channelLogger = channelLogger;
this.executor = executor;
}
Expand Down Expand Up @@ -460,6 +464,25 @@ public SynchronizationContext getSynchronizationContext() {
return syncContext;
}

/**
* Returns a {@link ScheduledExecutorService} for scheduling delayed tasks.
*
* <p>This service is a shared resource and is only meant for quick tasks. DO NOT block or run
* time-consuming tasks.
*
* <p>The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()}
* and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called.
*
* @since 1.26.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public ScheduledExecutorService getScheduledExecutorService() {
if (scheduledExecutorService == null) {
throw new IllegalStateException("ScheduledExecutorService not set in Builder");
}
return scheduledExecutorService;
}

/**
* Returns the {@link ServiceConfigParser}.
*
Expand Down Expand Up @@ -501,6 +524,7 @@ public String toString() {
.add("proxyDetector", proxyDetector)
.add("syncContext", syncContext)
.add("serviceConfigParser", serviceConfigParser)
.add("scheduledExecutorService", scheduledExecutorService)
.add("channelLogger", channelLogger)
.add("executor", executor)
.toString();
Expand All @@ -517,6 +541,7 @@ public Builder toBuilder() {
builder.setProxyDetector(proxyDetector);
builder.setSynchronizationContext(syncContext);
builder.setServiceConfigParser(serviceConfigParser);
builder.setScheduledExecutorService(scheduledExecutorService);
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
return builder;
Expand All @@ -541,6 +566,7 @@ public static final class Builder {
private ProxyDetector proxyDetector;
private SynchronizationContext syncContext;
private ServiceConfigParser serviceConfigParser;
private ScheduledExecutorService scheduledExecutorService;
private ChannelLogger channelLogger;
private Executor executor;

Expand Down Expand Up @@ -577,6 +603,16 @@ public Builder setSynchronizationContext(SynchronizationContext syncContext) {
return this;
}

/**
* See {@link Args#getScheduledExecutorService}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public Builder setScheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
return this;
}

/**
* See {@link Args#getServiceConfigParser}. This is a required field.
*
Expand Down Expand Up @@ -618,7 +654,7 @@ public Args build() {
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
channelLogger, executor);
scheduledExecutorService, channelLogger, executor);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/io/grpc/NameResolverTest.java
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -45,6 +46,8 @@ public class NameResolverTest {
private final SynchronizationContext syncContext =
new SynchronizationContext(mock(UncaughtExceptionHandler.class));
private final ServiceConfigParser parser = mock(ServiceConfigParser.class);
private final ScheduledExecutorService scheduledExecutorService =
mock(ScheduledExecutorService.class);
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private URI uri;
Expand All @@ -62,6 +65,7 @@ public void args() {
assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);

Expand All @@ -70,6 +74,7 @@ public void args() {
assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);

Expand Down Expand Up @@ -254,6 +259,7 @@ private NameResolver.Args createArgs() {
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(parser)
.setScheduledExecutorService(scheduledExecutorService)
.setChannelLogger(channelLogger)
.setOffloadExecutor(executor)
.build();
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final NameResolver.Args nameResolverArgs;
private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
private final ClientTransportFactory transportFactory;
private final ScheduledExecutorForBalancer scheduledExecutorForBalancer;
private final RestrictedScheduledExecutor scheduledExecutor;
private final Executor executor;
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
Expand Down Expand Up @@ -562,6 +562,12 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
this.target = checkNotNull(builder.target, "target");
this.logId = InternalLogId.allocate("Channel", target);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutor =
new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
maxTraceEvents = builder.maxTraceEvents;
channelTracer = new ChannelTracer(
logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
Expand All @@ -581,6 +587,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
.setDefaultPort(builder.getDefaultPort())
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setScheduledExecutorService(scheduledExecutor)
.setServiceConfigParser(
new ScParser(
retryEnabled,
Expand All @@ -598,18 +605,11 @@ public void execute(Runnable command) {
})
.build();
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutorForBalancer =
new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());

serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
this.defaultServiceConfig = builder.defaultServiceConfig;
Expand Down Expand Up @@ -1269,7 +1269,7 @@ public SynchronizationContext getSynchronizationContext() {

@Override
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorForBalancer;
return scheduledExecutor;
}

@Override
Expand Down Expand Up @@ -1736,10 +1736,10 @@ synchronized void release() {
}
}

private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService {
private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
final ScheduledExecutorService delegate;

private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) {
private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}

Expand Down

0 comments on commit 7f6909d

Please sign in to comment.