Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

[gax-java] chore: generalize ExecutorService for transport layers #1098

Merged
merged 8 commits into from Jun 5, 2020
Expand Up @@ -33,8 +33,6 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -50,6 +48,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand All @@ -76,7 +75,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
static final int MAX_POOL_SIZE = 1000;

private final int processorCount;
private final ExecutorProvider executorProvider;
private final Executor executor;
private final HeaderProvider headerProvider;
private final String endpoint;
private final EnvironmentProvider envProvider;
Expand All @@ -96,7 +95,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP

private InstantiatingGrpcChannelProvider(Builder builder) {
this.processorCount = builder.processorCount;
this.executorProvider = builder.executorProvider;
this.executor = builder.executor;
this.headerProvider = builder.headerProvider;
this.endpoint = builder.endpoint;
this.envProvider = builder.envProvider;
Expand All @@ -115,12 +114,18 @@ private InstantiatingGrpcChannelProvider(Builder builder) {

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
return withExecutor((Executor) executor);
}

@Override
public TransportChannelProvider withExecutor(Executor executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -229,7 +234,6 @@ private boolean isDirectPathEnabled(String serviceAddress) {
}

private ManagedChannel createSingleChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
GrpcHeaderInterceptor headerInterceptor =
new GrpcHeaderInterceptor(headerProvider.getHeaders());
GrpcMetadataHandlerInterceptor metadataHandlerInterceptor =
Expand Down Expand Up @@ -354,7 +358,7 @@ public static Builder newBuilder() {

public static final class Builder {
private int processorCount;
private ExecutorProvider executorProvider;
private Executor executor;
private HeaderProvider headerProvider;
private String endpoint;
private EnvironmentProvider envProvider;
Expand All @@ -377,7 +381,7 @@ private Builder() {

private Builder(InstantiatingGrpcChannelProvider provider) {
this.processorCount = provider.processorCount;
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.envProvider = provider.envProvider;
Expand All @@ -401,18 +405,24 @@ Builder setProcessorCount(int processorCount) {
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the Executor for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: GrpcTransportProvider will automatically provide its
* own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(Executor executor) {
miraleung marked this conversation as resolved.
Show resolved Hide resolved
this.executor = executor;
return this;
}

/** @deprecated. Please use {@link #setExecutor(Executor)}. */
miraleung marked this conversation as resolved.
Show resolved Hide resolved
@Deprecated
public Builder setExecutor(ScheduledExecutorService executor) {
Copy link
Contributor

@vam-google vam-google Jun 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called setExecutorProvider and accept ExecutorProvider as an argument (i.e. put back the old method and deprecate it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return setExecutor((Executor) executor);
}

/**
* Sets the HeaderProvider for this TransportChannelProvider.
*
Expand Down
Expand Up @@ -34,7 +34,6 @@
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
Expand All @@ -45,6 +44,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void testWithPoolSize() throws IOException {
TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");
assertThat(provider.acceptsPoolSize()).isTrue();
Expand Down Expand Up @@ -167,7 +167,7 @@ private void testWithInterceptors(int numChannels) throws Exception {
.setEndpoint("localhost:8080")
.setPoolSize(numChannels)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setInterceptorProvider(interceptorProvider)
.build();

Expand Down Expand Up @@ -199,7 +199,7 @@ public void testChannelConfigurator() throws IOException {
InstantiatingGrpcChannelProvider.newBuilder()
.setEndpoint("localhost:8080")
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setChannelConfigurator(channelConfigurator)
.setPoolSize(numChannels)
.build()
Expand Down Expand Up @@ -229,7 +229,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand Down Expand Up @@ -259,7 +259,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand Down Expand Up @@ -289,7 +289,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
.setAttemptDirectPath(false)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand Down Expand Up @@ -318,7 +318,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("localhost:8080");

Expand All @@ -337,7 +337,7 @@ public void testWithIPv6Address() throws IOException {
TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.build()
.withExecutor(executor)
.withExecutor((Executor) executor)
.withHeaders(Collections.<String, String>emptyMap())
.withEndpoint("[::1]:8080");
assertThat(provider.needsEndpoint()).isFalse();
Expand All @@ -359,7 +359,7 @@ public void testWithPrimeChannel() throws IOException {
.setEndpoint("localhost:8080")
.setPoolSize(poolSize)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(Executor.class))
.setChannelPrimer(mockChannelPrimer)
.build();

Expand Down
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -78,8 +79,14 @@ public boolean needsExecutor() {
return false;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return withExecutor((Executor) executor);
miraleung marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public TransportChannelProvider withExecutor(Executor executor) {
throw new UnsupportedOperationException("LocalChannelProvider doesn't need an executor");
}

Expand Down
Expand Up @@ -32,8 +32,6 @@
import com.google.api.client.http.HttpTransport;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -43,6 +41,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/**
Expand All @@ -60,38 +59,44 @@
@BetaApi
@InternalExtensionOnly
public final class InstantiatingHttpJsonChannelProvider implements TransportChannelProvider {
private final ExecutorProvider executorProvider;
private final Executor executor;
private final HeaderProvider headerProvider;
private final String endpoint;
private final HttpTransport httpTransport;

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider, HeaderProvider headerProvider, String endpoint) {
this.executorProvider = executorProvider;
Executor executor, HeaderProvider headerProvider, String endpoint) {
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = null;
}

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider,
Executor executor,
HeaderProvider headerProvider,
String endpoint,
HttpTransport httpTransport) {
this.executorProvider = executorProvider;
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = httpTransport;
}

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
return withExecutor((Executor) executor);
}

@Override
public TransportChannelProvider withExecutor(Executor executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -155,7 +160,6 @@ public TransportChannelProvider withCredentials(Credentials credentials) {
}

private TransportChannel createChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
Map<String, String> headers = headerProvider.getHeaders();

List<HttpJsonHeaderEnhancer> headerEnhancers = Lists.newArrayList();
Expand Down Expand Up @@ -193,33 +197,39 @@ public static Builder newBuilder() {
}

public static final class Builder {
private ExecutorProvider executorProvider;
private Executor executor;
private HeaderProvider headerProvider;
private String endpoint;
private HttpTransport httpTransport;

private Builder() {}

private Builder(InstantiatingHttpJsonChannelProvider provider) {
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.httpTransport = provider.httpTransport;
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the Executor for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: InstantiatingHttpJsonChannelProvider will
* automatically provide its own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(Executor executor) {
this.executor = executor;
return this;
}

/** @deprecated. Please use {@link #setExecutor(Executor)}. */
@Deprecated
public Builder setExecutor(ScheduledExecutorService executor) {
Copy link
Contributor

@vam-google vam-google Jun 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called setExecutorProvider and accept ExecutorProvider as an argument (i.e. put back the old method and deprecate it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return setExecutor((Executor) executor);
}

/**
* Sets the HeaderProvider for this TransportChannelProvider.
*
Expand Down Expand Up @@ -250,7 +260,7 @@ public String getEndpoint() {

public InstantiatingHttpJsonChannelProvider build() {
return new InstantiatingHttpJsonChannelProvider(
executorProvider, headerProvider, endpoint, httpTransport);
executor, headerProvider, endpoint, httpTransport);
}
}
}
Expand Up @@ -35,6 +35,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand All @@ -57,6 +58,11 @@ public void basicTest() throws IOException {
assertThat(provider.needsEndpoint()).isFalse();

assertThat(provider.needsExecutor()).isTrue();
provider = provider.withExecutor((Executor) executor);
assertThat(provider.needsExecutor()).isFalse();

// Added for code coverage. Remove when withExecutor(ScheduledExecutorService) is removed.
assertThat(provider.needsExecutor()).isFalse();
provider = provider.withExecutor(executor);
assertThat(provider.needsExecutor()).isFalse();

Expand Down
3 changes: 2 additions & 1 deletion gax/src/main/java/com/google/api/gax/rpc/ClientContext.java
Expand Up @@ -45,6 +45,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -136,7 +137,7 @@ public static ClientContext create(StubSettings settings) throws IOException {

TransportChannelProvider transportChannelProvider = settings.getTransportChannelProvider();
if (transportChannelProvider.needsExecutor()) {
transportChannelProvider = transportChannelProvider.withExecutor(executor);
transportChannelProvider = transportChannelProvider.withExecutor((Executor) executor);
}
Map<String, String> headers =
ImmutableMap.<String, String>builder()
Expand Down