Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api, core: make scheduled executor service accessible for NameResolver.Args #6455

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 37 additions & 1 deletion api/src/main/java/io/grpc/NameResolver.java
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -412,6 +413,7 @@ public static final class Args {
private final ProxyDetector proxyDetector;
private final SynchronizationContext syncContext;
private final ServiceConfigParser serviceConfigParser;
@Nullable private final ScheduledExecutorService scheduledExecutorService;
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;

Expand All @@ -420,12 +422,14 @@ private Args(
ProxyDetector proxyDetector,
SynchronizationContext syncContext,
ServiceConfigParser serviceConfigParser,
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set");
this.scheduledExecutorService = scheduledExecutorService;
this.channelLogger = channelLogger;
this.executor = executor;
}
Expand Down Expand Up @@ -460,6 +464,25 @@ public SynchronizationContext getSynchronizationContext() {
return syncContext;
}

/**
* Returns a {@link ScheduledExecutorService} for scheduling delayed tasks.
*
* <p>This service is a shared resource and is only meant for quick tasks. DO NOT block or run
* time-consuming tasks.
*
* <p>The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()}
* and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called.
*
* @since 1.26.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public ScheduledExecutorService getScheduledExecutorService() {
if (scheduledExecutorService == null) {
throw new IllegalStateException("ScheduledExecutorService not set in Builder");
}
return scheduledExecutorService;
}

/**
* Returns the {@link ServiceConfigParser}.
*
Expand Down Expand Up @@ -501,6 +524,7 @@ public String toString() {
.add("proxyDetector", proxyDetector)
.add("syncContext", syncContext)
.add("serviceConfigParser", serviceConfigParser)
.add("scheduledExecutorService", scheduledExecutorService)
.add("channelLogger", channelLogger)
.add("executor", executor)
.toString();
Expand All @@ -517,6 +541,7 @@ public Builder toBuilder() {
builder.setProxyDetector(proxyDetector);
builder.setSynchronizationContext(syncContext);
builder.setServiceConfigParser(serviceConfigParser);
builder.setScheduledExecutorService(scheduledExecutorService);
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
return builder;
Expand All @@ -541,6 +566,7 @@ public static final class Builder {
private ProxyDetector proxyDetector;
private SynchronizationContext syncContext;
private ServiceConfigParser serviceConfigParser;
private ScheduledExecutorService scheduledExecutorService;
private ChannelLogger channelLogger;
private Executor executor;

Expand Down Expand Up @@ -577,6 +603,16 @@ public Builder setSynchronizationContext(SynchronizationContext syncContext) {
return this;
}

/**
* See {@link Args#getScheduledExecutorService}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public Builder setScheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
return this;
}

/**
* See {@link Args#getServiceConfigParser}. This is a required field.
*
Expand Down Expand Up @@ -618,7 +654,7 @@ public Args build() {
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
channelLogger, executor);
scheduledExecutorService, channelLogger, executor);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions api/src/test/java/io/grpc/NameResolverRegistryTest.java
Expand Up @@ -26,6 +26,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -39,6 +40,7 @@ public class NameResolverRegistryTest {
.setProxyDetector(mock(ProxyDetector.class))
.setSynchronizationContext(new SynchronizationContext(mock(UncaughtExceptionHandler.class)))
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not leave unset?

.setChannelLogger(mock(ChannelLogger.class))
.build();

Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/io/grpc/NameResolverTest.java
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -45,6 +46,8 @@ public class NameResolverTest {
private final SynchronizationContext syncContext =
new SynchronizationContext(mock(UncaughtExceptionHandler.class));
private final ServiceConfigParser parser = mock(ServiceConfigParser.class);
private final ScheduledExecutorService scheduledExecutorService =
mock(ScheduledExecutorService.class);
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private URI uri;
Expand All @@ -62,6 +65,7 @@ public void args() {
assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);

Expand All @@ -70,6 +74,7 @@ public void args() {
assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);

Expand Down Expand Up @@ -254,6 +259,7 @@ private NameResolver.Args createArgs() {
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(parser)
.setScheduledExecutorService(scheduledExecutorService)
.setChannelLogger(channelLogger)
.setOffloadExecutor(executor)
.build();
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final NameResolver.Args nameResolverArgs;
private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
private final ClientTransportFactory transportFactory;
private final ScheduledExecutorForBalancer scheduledExecutorForBalancer;
private final RestrictedScheduledExecutor scheduledExecutor;
private final Executor executor;
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
Expand Down Expand Up @@ -562,6 +562,12 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
this.target = checkNotNull(builder.target, "target");
this.logId = InternalLogId.allocate("Channel", target);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutor =
new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
maxTraceEvents = builder.maxTraceEvents;
channelTracer = new ChannelTracer(
logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
Expand All @@ -581,6 +587,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
.setDefaultPort(builder.getDefaultPort())
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setScheduledExecutorService(scheduledExecutor)
.setServiceConfigParser(
new ScParser(
retryEnabled,
Expand All @@ -598,18 +605,11 @@ public void execute(Runnable command) {
})
.build();
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutorForBalancer =
new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());

serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
this.defaultServiceConfig = builder.defaultServiceConfig;
Expand Down Expand Up @@ -1269,7 +1269,7 @@ public SynchronizationContext getSynchronizationContext() {

@Override
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorForBalancer;
return scheduledExecutor;
}

@Override
Expand Down Expand Up @@ -1736,10 +1736,10 @@ synchronized void release() {
}
}

private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService {
private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
final ScheduledExecutorService delegate;

private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) {
private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}

Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.grpc.NameResolver.ServiceConfigParser;
import io.grpc.SynchronizationContext;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -48,6 +49,7 @@ public void uncaughtException(Thread t, Throwable e) {
.setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();

Expand Down
3 changes: 3 additions & 0 deletions core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
Expand Up @@ -70,6 +70,7 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
Expand Down Expand Up @@ -116,6 +117,7 @@ public void uncaughtException(Thread t, Throwable e) {
.setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one too. Is it necessary to set it? Can we leave it unset everywhere it is unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. It doesn't matter much. I set it just for completeness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mainly requesting it because it makes the tests more bug-prone. As it is now if the code starts using the scheduled executor service it will throw and then it is clear a FakeClock needs to be injected. With the mock it just silently doesn't do anything.

.setChannelLogger(mock(ChannelLogger.class))
.build();

Expand Down Expand Up @@ -177,6 +179,7 @@ private DnsNameResolver newResolver(
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();
return newResolver(name, stopwatch, isAndroid, args);
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.grpc.SynchronizationContext;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -41,6 +42,7 @@ public class ManagedChannelImplGetNameResolverTest {
.setProxyDetector(mock(ProxyDetector.class))
.setSynchronizationContext(new SynchronizationContext(mock(UncaughtExceptionHandler.class)))
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();

Expand Down
Expand Up @@ -30,6 +30,7 @@
import io.grpc.SynchronizationContext;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -42,6 +43,7 @@ public class OverrideAuthorityNameResolverTest {
.setProxyDetector(mock(ProxyDetector.class))
.setSynchronizationContext(new SynchronizationContext(mock(UncaughtExceptionHandler.class)))
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();

Expand Down
Expand Up @@ -28,6 +28,7 @@
import io.grpc.SynchronizationContext;
import io.grpc.internal.GrpcUtil;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -47,6 +48,7 @@ public void uncaughtException(Thread t, Throwable e) {
.setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();

Expand Down
2 changes: 2 additions & 0 deletions xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
Expand Up @@ -39,6 +39,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void uncaughtException(Thread t, Throwable e) {
.setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setScheduledExecutorService(mock(ScheduledExecutorService.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();

Expand Down