Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

[gax-java] chore: generalize ExecutorService for transport layers #1098

Merged
merged 8 commits into from Jun 5, 2020
Expand Up @@ -33,8 +33,6 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -50,7 +48,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand All @@ -76,7 +74,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
static final int MAX_POOL_SIZE = 1000;

private final int processorCount;
private final ExecutorProvider executorProvider;
private final ExecutorService executor;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
private final HeaderProvider headerProvider;
private final String endpoint;
private final EnvironmentProvider envProvider;
Expand All @@ -96,7 +94,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP

private InstantiatingGrpcChannelProvider(Builder builder) {
this.processorCount = builder.processorCount;
this.executorProvider = builder.executorProvider;
this.executor = builder.executor;
this.headerProvider = builder.headerProvider;
this.endpoint = builder.endpoint;
this.envProvider = builder.envProvider;
Expand All @@ -115,12 +113,12 @@ private InstantiatingGrpcChannelProvider(Builder builder) {

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
public TransportChannelProvider withExecutor(ExecutorService executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -229,7 +227,6 @@ private boolean isDirectPathEnabled(String serviceAddress) {
}

private ManagedChannel createSingleChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
GrpcHeaderInterceptor headerInterceptor =
new GrpcHeaderInterceptor(headerProvider.getHeaders());
GrpcMetadataHandlerInterceptor metadataHandlerInterceptor =
Expand Down Expand Up @@ -354,7 +351,7 @@ public static Builder newBuilder() {

public static final class Builder {
private int processorCount;
private ExecutorProvider executorProvider;
private ExecutorService executor;
private HeaderProvider headerProvider;
private String endpoint;
private EnvironmentProvider envProvider;
Expand All @@ -377,7 +374,7 @@ private Builder() {

private Builder(InstantiatingGrpcChannelProvider provider) {
this.processorCount = provider.processorCount;
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.envProvider = provider.envProvider;
Expand All @@ -401,15 +398,15 @@ Builder setProcessorCount(int processorCount) {
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the ExecutorService for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: GrpcTransportProvider will automatically provide its
* own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}

Expand Down
Expand Up @@ -34,7 +34,6 @@
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
Expand All @@ -45,6 +44,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand Down Expand Up @@ -167,7 +167,7 @@ private void testWithInterceptors(int numChannels) throws Exception {
.setEndpoint("localhost:8080")
.setPoolSize(numChannels)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(ExecutorService.class))
.setInterceptorProvider(interceptorProvider)
.build();

Expand Down Expand Up @@ -199,7 +199,7 @@ public void testChannelConfigurator() throws IOException {
InstantiatingGrpcChannelProvider.newBuilder()
.setEndpoint("localhost:8080")
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(ExecutorService.class))
.setChannelConfigurator(channelConfigurator)
.setPoolSize(numChannels)
.build()
Expand Down Expand Up @@ -359,7 +359,7 @@ public void testWithPrimeChannel() throws IOException {
.setEndpoint("localhost:8080")
.setPoolSize(poolSize)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(ExecutorService.class))
.setChannelPrimer(mockChannelPrimer)
.build();

Expand Down
Expand Up @@ -50,7 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;

/** LocalChannelProvider creates channels for in-memory gRPC services. */
Expand Down Expand Up @@ -79,7 +79,7 @@ public boolean needsExecutor() {
}

@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
public TransportChannelProvider withExecutor(ExecutorService executor) {
throw new UnsupportedOperationException("LocalChannelProvider doesn't need an executor");
}

Expand Down
Expand Up @@ -32,8 +32,6 @@
import com.google.api.client.http.HttpTransport;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -43,7 +41,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;

/**
* InstantiatingHttpJsonChannelProvider is a TransportChannelProvider which constructs a {@link
Expand All @@ -60,38 +58,38 @@
@BetaApi
@InternalExtensionOnly
public final class InstantiatingHttpJsonChannelProvider implements TransportChannelProvider {
private final ExecutorProvider executorProvider;
private final ExecutorService executor;
private final HeaderProvider headerProvider;
private final String endpoint;
private final HttpTransport httpTransport;

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider, HeaderProvider headerProvider, String endpoint) {
this.executorProvider = executorProvider;
ExecutorService executor, HeaderProvider headerProvider, String endpoint) {
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = null;
}

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider,
ExecutorService executor,
HeaderProvider headerProvider,
String endpoint,
HttpTransport httpTransport) {
this.executorProvider = executorProvider;
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = httpTransport;
}

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
public TransportChannelProvider withExecutor(ExecutorService executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -155,7 +153,6 @@ public TransportChannelProvider withCredentials(Credentials credentials) {
}

private TransportChannel createChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
Map<String, String> headers = headerProvider.getHeaders();

List<HttpJsonHeaderEnhancer> headerEnhancers = Lists.newArrayList();
Expand Down Expand Up @@ -193,30 +190,30 @@ public static Builder newBuilder() {
}

public static final class Builder {
private ExecutorProvider executorProvider;
private ExecutorService executor;
private HeaderProvider headerProvider;
private String endpoint;
private HttpTransport httpTransport;

private Builder() {}

private Builder(InstantiatingHttpJsonChannelProvider provider) {
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.httpTransport = provider.httpTransport;
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the ExecutorService for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: InstantiatingHttpJsonChannelProvider will
* automatically provide its own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}

Expand Down Expand Up @@ -250,7 +247,7 @@ public String getEndpoint() {

public InstantiatingHttpJsonChannelProvider build() {
return new InstantiatingHttpJsonChannelProvider(
executorProvider, headerProvider, endpoint, httpTransport);
executor, headerProvider, endpoint, httpTransport);
}
}
}
Expand Up @@ -35,7 +35,7 @@
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;

/** An instance of TransportChannelProvider that always provides the same TransportChannel. */
@InternalExtensionOnly
Expand All @@ -58,7 +58,7 @@ public boolean needsExecutor() {
}

@Override
public FixedTransportChannelProvider withExecutor(ScheduledExecutorService executor) {
public FixedTransportChannelProvider withExecutor(ExecutorService executor) {
throw new UnsupportedOperationException(
"FixedTransportChannelProvider doesn't need an executor");
}
Expand Down
Expand Up @@ -34,7 +34,7 @@
import com.google.auth.Credentials;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;

/**
* Provides an interface to either build a TransportChannel or provide a fixed TransportChannel that
Expand Down Expand Up @@ -70,7 +70,7 @@ public interface TransportChannelProvider {
*
* <p>This method should only be called if {@link #needsExecutor()} returns true.
*/
TransportChannelProvider withExecutor(ScheduledExecutorService executor);
TransportChannelProvider withExecutor(ExecutorService executor);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

/** True if the TransportProvider has no headers provided. */
@BetaApi("The surface for customizing headers is not stable yet and may change in the future.")
Expand Down Expand Up @@ -114,8 +114,8 @@ public interface TransportChannelProvider {
* Provides a Transport, which could either be a new instance for every call, or the same
* instance, depending on the implementation.
*
* <p>If {@link #needsExecutor()} is true, then {@link #withExecutor(ScheduledExecutorService)}
* needs to be called first to provide an executor.
* <p>If {@link #needsExecutor()} is true, then {@link #withExecutor(ExecutorService)} needs to be
* called first to provide an executor.
*
* <p>If {@link #needsHeaders()} is true, then {@link #withHeaders(Map)} needs to be called first
* to provide headers.
Expand Down
Expand Up @@ -44,6 +44,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand Down Expand Up @@ -88,15 +89,15 @@ public ScheduledExecutorService getExecutor() {
}

private static class FakeTransportProvider implements TransportChannelProvider {
final ScheduledExecutorService executor;
final ExecutorService executor;
final FakeTransportChannel transport;
final boolean shouldAutoClose;
final Map<String, String> headers;
final Credentials credentials;

FakeTransportProvider(
FakeTransportChannel transport,
ScheduledExecutorService executor,
ExecutorService executor,
boolean shouldAutoClose,
Map<String, String> headers,
Credentials credentials) {
Expand All @@ -119,7 +120,7 @@ public boolean needsExecutor() {
}

@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
public TransportChannelProvider withExecutor(ExecutorService executor) {
return new FakeTransportProvider(
this.transport, executor, this.shouldAutoClose, this.headers, this.credentials);
}
Expand Down