Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

[gax-java] chore: generalize ExecutorService for transport layers #1098

Merged
merged 8 commits into from Jun 5, 2020
Expand Up @@ -33,8 +33,6 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -50,6 +48,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand All @@ -76,7 +75,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
static final int MAX_POOL_SIZE = 1000;

private final int processorCount;
private final ExecutorProvider executorProvider;
private final ExecutorService executor;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
private final HeaderProvider headerProvider;
private final String endpoint;
private final EnvironmentProvider envProvider;
Expand All @@ -96,7 +95,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP

private InstantiatingGrpcChannelProvider(Builder builder) {
this.processorCount = builder.processorCount;
this.executorProvider = builder.executorProvider;
this.executor = builder.executor;
this.headerProvider = builder.headerProvider;
this.endpoint = builder.endpoint;
this.envProvider = builder.envProvider;
Expand All @@ -115,12 +114,18 @@ private InstantiatingGrpcChannelProvider(Builder builder) {

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
return toBuilder().setExecutor(executor).build();
}

@Override
public TransportChannelProvider withExecutor(ExecutorService executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -229,7 +234,6 @@ private boolean isDirectPathEnabled(String serviceAddress) {
}

private ManagedChannel createSingleChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
GrpcHeaderInterceptor headerInterceptor =
new GrpcHeaderInterceptor(headerProvider.getHeaders());
GrpcMetadataHandlerInterceptor metadataHandlerInterceptor =
Expand Down Expand Up @@ -354,7 +358,7 @@ public static Builder newBuilder() {

public static final class Builder {
private int processorCount;
private ExecutorProvider executorProvider;
private ExecutorService executor;
private HeaderProvider headerProvider;
private String endpoint;
private EnvironmentProvider envProvider;
Expand All @@ -377,7 +381,7 @@ private Builder() {

private Builder(InstantiatingGrpcChannelProvider provider) {
this.processorCount = provider.processorCount;
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.envProvider = provider.envProvider;
Expand All @@ -401,15 +405,15 @@ Builder setProcessorCount(int processorCount) {
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the ExecutorService for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: GrpcTransportProvider will automatically provide its
* own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}

Expand Down
Expand Up @@ -34,7 +34,6 @@
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
Expand All @@ -45,6 +44,7 @@
import io.grpc.alts.ComputeEngineChannelBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand Down Expand Up @@ -167,7 +167,7 @@ private void testWithInterceptors(int numChannels) throws Exception {
.setEndpoint("localhost:8080")
.setPoolSize(numChannels)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(ExecutorService.class))
.setInterceptorProvider(interceptorProvider)
.build();

Expand Down Expand Up @@ -199,7 +199,7 @@ public void testChannelConfigurator() throws IOException {
InstantiatingGrpcChannelProvider.newBuilder()
.setEndpoint("localhost:8080")
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(ExecutorService.class))
.setChannelConfigurator(channelConfigurator)
.setPoolSize(numChannels)
.build()
Expand Down Expand Up @@ -359,7 +359,7 @@ public void testWithPrimeChannel() throws IOException {
.setEndpoint("localhost:8080")
.setPoolSize(poolSize)
.setHeaderProvider(Mockito.mock(HeaderProvider.class))
.setExecutorProvider(Mockito.mock(ExecutorProvider.class))
.setExecutor(Mockito.mock(ExecutorService.class))
.setChannelPrimer(mockChannelPrimer)
.build();

Expand Down
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -83,6 +84,11 @@ public TransportChannelProvider withExecutor(ScheduledExecutorService executor)
throw new UnsupportedOperationException("LocalChannelProvider doesn't need an executor");
}

@Override
public TransportChannelProvider withExecutor(ExecutorService executor) {
throw new UnsupportedOperationException("LocalChannelProvider doesn't need an executor");
}

@Override
public boolean needsHeaders() {
return headerProvider == null;
Expand Down
Expand Up @@ -32,8 +32,6 @@
import com.google.api.client.http.HttpTransport;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -43,6 +41,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
Expand All @@ -60,38 +59,44 @@
@BetaApi
@InternalExtensionOnly
public final class InstantiatingHttpJsonChannelProvider implements TransportChannelProvider {
private final ExecutorProvider executorProvider;
private final ExecutorService executor;
private final HeaderProvider headerProvider;
private final String endpoint;
private final HttpTransport httpTransport;

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider, HeaderProvider headerProvider, String endpoint) {
this.executorProvider = executorProvider;
ExecutorService executor, HeaderProvider headerProvider, String endpoint) {
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = null;
}

private InstantiatingHttpJsonChannelProvider(
ExecutorProvider executorProvider,
ExecutorService executor,
HeaderProvider headerProvider,
String endpoint,
HttpTransport httpTransport) {
this.executorProvider = executorProvider;
this.executor = executor;
this.headerProvider = headerProvider;
this.endpoint = endpoint;
this.httpTransport = httpTransport;
}

@Override
public boolean needsExecutor() {
return executorProvider == null;
return executor == null;
}

@Deprecated
@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return toBuilder().setExecutorProvider(FixedExecutorProvider.create(executor)).build();
return toBuilder().setExecutor(executor).build();
}

@Override
public TransportChannelProvider withExecutor(ExecutorService executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
Expand Down Expand Up @@ -155,7 +160,6 @@ public TransportChannelProvider withCredentials(Credentials credentials) {
}

private TransportChannel createChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
Map<String, String> headers = headerProvider.getHeaders();

List<HttpJsonHeaderEnhancer> headerEnhancers = Lists.newArrayList();
Expand Down Expand Up @@ -193,30 +197,30 @@ public static Builder newBuilder() {
}

public static final class Builder {
private ExecutorProvider executorProvider;
private ExecutorService executor;
private HeaderProvider headerProvider;
private String endpoint;
private HttpTransport httpTransport;

private Builder() {}

private Builder(InstantiatingHttpJsonChannelProvider provider) {
this.executorProvider = provider.executorProvider;
this.executor = provider.executor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.httpTransport = provider.httpTransport;
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
* Sets the ExecutorService for this TransportChannelProvider.
*
* <p>This is optional; if it is not provided, needsExecutor() will return true, meaning that an
* Executor must be provided when getChannel is called on the constructed
* TransportChannelProvider instance. Note: InstantiatingHttpJsonChannelProvider will
* automatically provide its own Executor in this circumstance when it calls getChannel.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
public Builder setExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}

Expand Down Expand Up @@ -250,7 +254,7 @@ public String getEndpoint() {

public InstantiatingHttpJsonChannelProvider build() {
return new InstantiatingHttpJsonChannelProvider(
executorProvider, headerProvider, endpoint, httpTransport);
executor, headerProvider, endpoint, httpTransport);
}
}
}
Expand Up @@ -35,6 +35,7 @@
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/** An instance of TransportChannelProvider that always provides the same TransportChannel. */
Expand Down Expand Up @@ -63,6 +64,12 @@ public FixedTransportChannelProvider withExecutor(ScheduledExecutorService execu
"FixedTransportChannelProvider doesn't need an executor");
}

@Override
public FixedTransportChannelProvider withExecutor(ExecutorService executor) {
throw new UnsupportedOperationException(
"FixedTransportChannelProvider doesn't need an executor");
}

@Override
@BetaApi("The surface for customizing headers is not stable yet and may change in the future.")
public boolean needsHeaders() {
Expand Down
Expand Up @@ -34,6 +34,7 @@
import com.google.auth.Credentials;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
Expand Down Expand Up @@ -70,6 +71,10 @@ public interface TransportChannelProvider {
*
* <p>This method should only be called if {@link #needsExecutor()} returns true.
*/
TransportChannelProvider withExecutor(ExecutorService executor);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

/** @deprecated Please use {@link #withExecutor(ExecutorService)}. */
@Deprecated
TransportChannelProvider withExecutor(ScheduledExecutorService executor);

/** True if the TransportProvider has no headers provided. */
Expand Down Expand Up @@ -114,8 +119,8 @@ public interface TransportChannelProvider {
* Provides a Transport, which could either be a new instance for every call, or the same
* instance, depending on the implementation.
*
* <p>If {@link #needsExecutor()} is true, then {@link #withExecutor(ScheduledExecutorService)}
* needs to be called first to provide an executor.
* <p>If {@link #needsExecutor()} is true, then {@link #withExecutor(ExecutorService)} needs to be
* called first to provide an executor.
*
* <p>If {@link #needsHeaders()} is true, then {@link #withHeaders(Map)} needs to be called first
* to provide headers.
Expand Down
10 changes: 8 additions & 2 deletions gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java
Expand Up @@ -44,6 +44,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Test;
Expand Down Expand Up @@ -88,15 +89,15 @@ public ScheduledExecutorService getExecutor() {
}

private static class FakeTransportProvider implements TransportChannelProvider {
final ScheduledExecutorService executor;
final ExecutorService executor;
final FakeTransportChannel transport;
final boolean shouldAutoClose;
final Map<String, String> headers;
final Credentials credentials;

FakeTransportProvider(
FakeTransportChannel transport,
ScheduledExecutorService executor,
ExecutorService executor,
boolean shouldAutoClose,
Map<String, String> headers,
Credentials credentials) {
Expand All @@ -120,6 +121,11 @@ public boolean needsExecutor() {

@Override
public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
return withExecutor((ExecutorService) executor);
}

@Override
public TransportChannelProvider withExecutor(ExecutorService executor) {
return new FakeTransportProvider(
this.transport, executor, this.shouldAutoClose, this.headers, this.credentials);
}
Expand Down