Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

[gax-java] chore: generalize ExecutorService for transport layers #1098

Merged
merged 8 commits into from Jun 5, 2020
Expand Up @@ -34,7 +34,6 @@
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -50,6 +49,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand All @@ -76,7 +76,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
static final int MAX_POOL_SIZE = 1000;

private final int processorCount;
private final ExecutorProvider executorProvider;
private final Executor executor;
private final HeaderProvider headerProvider;
private final String endpoint;
private final EnvironmentProvider envProvider;
Expand All @@ -96,7 +96,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP

private InstantiatingGrpcChannelProvider(Builder builder) {
this.processorCount = builder.processorCount;
this.executorProvider = builder.executorProvider;
this.executor = builder.executor;
this.headerProvider = builder.headerProvider;
this.endpoint = builder.endpoint;
this.envProvider = builder.envProvider;
Expand All @@ -115,12 +115,18 @@ private InstantiatingGrpcChannelProvider(Builder builder) {

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
return withExecutor((Executor) executor);
}

@Override
public TransportChannelProvider withExecutor(Executor executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -229,7 +235,6 @@ private boolean isDirectPathEnabled(String serviceAddress) {
}

private ManagedChannel createSingleChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
GrpcHeaderInterceptor headerInterceptor =
new GrpcHeaderInterceptor(headerProvider.getHeaders());
GrpcMetadataHandlerInterceptor metadataHandlerInterceptor =
Expand Down Expand Up @@ -354,7 +359,7 @@ public static Builder newBuilder() {

public static final class Builder {
private int processorCount;
private ExecutorProvider executorProvider;
private Executor executor;
private HeaderProvider headerProvider;
private String endpoint;
private EnvironmentProvider envProvider;
Expand All @@ -377,7 +382,7 @@ private Builder() {

private Builder(InstantiatingGrpcChannelProvider provider) {
this.processorCount = provider.processorCount;
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.envProvider = provider.envProvider;
Expand All @@ -401,18 +406,24 @@ Builder setProcessorCount(int processorCount) {
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the Executor for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: GrpcTransportProvider will automatically provide its
* own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(Executor executor) {
miraleung marked this conversation as resolved.
Show resolved Hide resolved
this.executor = executor;
return this;
}

/** @deprecated. Please use {@link #setExecutor(Executor)}. */
miraleung marked this conversation as resolved.
Show resolved Hide resolved
@Deprecated
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return setExecutor((Executor) executorProvider.getExecutor());
}

/**
* Sets the HeaderProvider for this TransportChannelProvider.
*
Expand Down
Expand Up @@ -34,7 +34,6 @@
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
Expand All @@ -45,6 +44,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void testWithPoolSize() throws IOException {
TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");
assertThat(provider.acceptsPoolSize()).isTrue();
Expand Down Expand Up @@ -167,7 +167,7 @@ private void testWithInterceptors(int numChannels) throws Exception {
.setEndpoint("localhost:8080")
.setPoolSize(numChannels)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setInterceptorProvider(interceptorProvider)
.build();

Expand Down Expand Up @@ -199,7 +199,7 @@ public void testChannelConfigurator() throws IOException {
InstantiatingGrpcChannelProvider.newBuilder()
.setEndpoint("localhost:8080")
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setChannelConfigurator(channelConfigurator)
.setPoolSize(numChannels)
.build()
Expand Down Expand Up @@ -229,7 +229,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand Down Expand Up @@ -259,7 +259,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand Down Expand Up @@ -289,7 +289,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
.setAttemptDirectPath(false)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand Down Expand Up @@ -318,7 +318,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand All @@ -337,7 +337,7 @@ public void testWithIPv6Address() throws IOException {
TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("[::1]:8080");
assertThat(provider.needsEndpoint()).isFalse();
Expand All @@ -359,7 +359,7 @@ public void testWithPrimeChannel() throws IOException {
.setEndpoint("localhost:8080")
.setPoolSize(poolSize)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setChannelPrimer(mockChannelPrimer)
.build();

Expand Down
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -78,8 +79,14 @@ public boolean needsExecutor() {
return false;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return withExecutor((Executor) executor);
miraleung marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public TransportChannelProvider withExecutor(Executor executor) {
throw new UnsupportedOperationException("LocalChannelProvider doesn't need an executor");
}

Expand Down
Expand Up @@ -33,7 +33,6 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -43,6 +42,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/**
Expand All @@ -60,38 +60,44 @@
@BetaApi
@InternalExtensionOnly
public final class InstantiatingHttpJsonChannelProvider implements TransportChannelProvider {
private final ExecutorProvider executorProvider;
private final Executor executor;
private final HeaderProvider headerProvider;
private final String endpoint;
private final HttpTransport httpTransport;

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider, HeaderProvider headerProvider, String endpoint) {
this.executorProvider = executorProvider;
Executor executor, HeaderProvider headerProvider, String endpoint) {
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = null;
}

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider,
Executor executor,
HeaderProvider headerProvider,
String endpoint,
HttpTransport httpTransport) {
this.executorProvider = executorProvider;
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = httpTransport;
}

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
return withExecutor((Executor) executor);
}

@Override
public TransportChannelProvider withExecutor(Executor executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -155,7 +161,6 @@ public TransportChannelProvider withCredentials(Credentials credentials) {
}

private TransportChannel createChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
Map<String, String> headers = headerProvider.getHeaders();

List<HttpJsonHeaderEnhancer> headerEnhancers = Lists.newArrayList();
Expand Down Expand Up @@ -193,33 +198,39 @@ public static Builder newBuilder() {
}

public static final class Builder {
private ExecutorProvider executorProvider;
private Executor executor;
private HeaderProvider headerProvider;
private String endpoint;
private HttpTransport httpTransport;

private Builder() {}

private Builder(InstantiatingHttpJsonChannelProvider provider) {
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.httpTransport = provider.httpTransport;
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the Executor for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: InstantiatingHttpJsonChannelProvider will
* automatically provide its own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(Executor executor) {
this.executor = executor;
return this;
}

/** @deprecated. Please use {@link #setExecutor(Executor)}. */
@Deprecated
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return setExecutor((Executor) executorProvider.getExecutor());
}

/**
* Sets the HeaderProvider for this TransportChannelProvider.
*
Expand Down Expand Up @@ -250,7 +261,7 @@ public String getEndpoint() {

public InstantiatingHttpJsonChannelProvider build() {
return new InstantiatingHttpJsonChannelProvider(
executorProvider, headerProvider, endpoint, httpTransport);
executor, headerProvider, endpoint, httpTransport);
}
}
}
Expand Up @@ -35,6 +35,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand All @@ -57,6 +58,11 @@ public void basicTest() throws IOException {
assertThat(provider.needsEndpoint()).isFalse();

assertThat(provider.needsExecutor()).isTrue();
provider = provider.withExecutor((Executor) executor);
assertThat(provider.needsExecutor()).isFalse();

// Added for code coverage. Remove when withExecutor(ScheduledExecutorService) is removed.
assertThat(provider.needsExecutor()).isFalse();
provider = provider.withExecutor(executor);
assertThat(provider.needsExecutor()).isFalse();

Expand Down
3 changes: 2 additions & 1 deletion gax/src/main/java/com/google/api/gax/rpc/ClientContext.java
Expand Up @@ -45,6 +45,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -136,7 +137,7 @@ public static ClientContext create(StubSettings settings) throws IOException {

TransportChannelProvider transportChannelProvider = settings.getTransportChannelProvider();
if (transportChannelProvider.needsExecutor()) {
transportChannelProvider = transportChannelProvider.withExecutor(executor);
transportChannelProvider = transportChannelProvider.withExecutor((Executor) executor);
}
Map<String, String> headers =
ImmutableMap.<String, String>builder()
Expand Down