Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Add setAttemptDirectPath(boolean) for InstantiatingGrpcChannelProvider.Builder #1015

Merged
merged 5 commits into from May 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -69,7 +69,6 @@
*/
@InternalExtensionOnly
public final class InstantiatingGrpcChannelProvider implements TransportChannelProvider {
static final String DIRECT_PATH_ENV_VAR = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH";
static final long DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS = 3600;
static final long DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS = 20;
// reduce the thundering herd problem of too many channels trying to (re)connect at the same time
Expand All @@ -79,7 +78,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
private final ExecutorProvider executorProvider;
private final HeaderProvider headerProvider;
private final String endpoint;
private final EnvironmentProvider envProvider;
private final boolean attemptDirectPath;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
@Nullable private final GrpcInterceptorProvider interceptorProvider;
@Nullable private final Integer maxInboundMessageSize;
@Nullable private final Integer maxInboundMetadataSize;
Expand All @@ -98,7 +97,6 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
this.executorProvider = builder.executorProvider;
this.headerProvider = builder.headerProvider;
this.endpoint = builder.endpoint;
this.envProvider = builder.envProvider;
this.interceptorProvider = builder.interceptorProvider;
this.maxInboundMessageSize = builder.maxInboundMessageSize;
this.maxInboundMetadataSize = builder.maxInboundMetadataSize;
Expand All @@ -109,6 +107,7 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
this.channelConfigurator = builder.channelConfigurator;
this.credentials = builder.credentials;
this.channelPrimer = builder.channelPrimer;
this.attemptDirectPath = builder.attemptDirectPath;
}

@Override
Expand Down Expand Up @@ -211,17 +210,6 @@ public ManagedChannel createSingleChannel() throws IOException {
return GrpcTransportChannel.create(outerChannel);
}

// The environment variable is used during the rollout phase for directpath.
// This checker function will be removed once directpath is stable.
private boolean isDirectPathEnabled(String serviceAddress) {
String whiteList = envProvider.getenv(DIRECT_PATH_ENV_VAR);
if (whiteList == null) return false;
for (String service : whiteList.split(",")) {
if (!service.isEmpty() && serviceAddress.contains(service)) return true;
}
return false;
}

private ManagedChannel createSingleChannel() throws IOException {
ScheduledExecutorService executor = executorProvider.getExecutor();
GrpcHeaderInterceptor headerInterceptor =
Expand All @@ -240,7 +228,7 @@ private ManagedChannel createSingleChannel() throws IOException {

// TODO(weiranf): Add a new API in ComputeEngineCredentials to check whether it's using default
// service account.
if (isDirectPathEnabled(serviceAddress) && credentials instanceof ComputeEngineCredentials) {
if (attemptDirectPath && credentials instanceof ComputeEngineCredentials) {
builder = ComputeEngineChannelBuilder.forAddress(serviceAddress, port);
// Set default keepAliveTime and keepAliveTimeout when directpath environment is enabled.
// Will be overridden by user defined values if any.
Expand Down Expand Up @@ -352,7 +340,7 @@ public static final class Builder {
private ExecutorProvider executorProvider;
private HeaderProvider headerProvider;
private String endpoint;
private EnvironmentProvider envProvider;
private boolean attemptDirectPath;
@Nullable private GrpcInterceptorProvider interceptorProvider;
@Nullable private Integer maxInboundMessageSize;
@Nullable private Integer maxInboundMetadataSize;
Expand All @@ -366,15 +354,13 @@ public static final class Builder {

private Builder() {
processorCount = Runtime.getRuntime().availableProcessors();
envProvider = DirectPathEnvironmentProvider.getInstance();
}

private Builder(InstantiatingGrpcChannelProvider provider) {
this.processorCount = provider.processorCount;
this.executorProvider = provider.executorProvider;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.envProvider = provider.envProvider;
this.interceptorProvider = provider.interceptorProvider;
this.maxInboundMessageSize = provider.maxInboundMessageSize;
this.maxInboundMetadataSize = provider.maxInboundMetadataSize;
Expand All @@ -385,6 +371,7 @@ private Builder(InstantiatingGrpcChannelProvider provider) {
this.channelConfigurator = provider.channelConfigurator;
this.credentials = provider.credentials;
this.channelPrimer = provider.channelPrimer;
this.attemptDirectPath = provider.attemptDirectPath;
}

/** Sets the number of available CPUs, used internally for testing. */
Expand All @@ -393,12 +380,6 @@ Builder setProcessorCount(int processorCount) {
return this;
}

/** Sets the environment variable provider used for testing. */
Builder setEnvironmentProvider(EnvironmentProvider envProvider) {
this.envProvider = envProvider;
return this;
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
*
Expand Down Expand Up @@ -565,6 +546,12 @@ public Builder setChannelPrimer(ChannelPrimer channelPrimer) {
return this;
}

/** Whether attempt DirectPath. */
public Builder setAttemptDirectPath(boolean attemptDirectPath) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
this.attemptDirectPath = attemptDirectPath;
return this;
}

public InstantiatingGrpcChannelProvider build() {
return new InstantiatingGrpcChannelProvider(this);
}
Expand Down Expand Up @@ -597,32 +584,4 @@ private static void validateEndpoint(String endpoint) {
}
Integer.parseInt(endpoint.substring(colon + 1));
}

/**
* EnvironmentProvider currently provides DirectPath environment variable, and is only used during
* initial rollout for DirectPath. This interface will be removed once the DirectPath environment
* is not used.
*/
interface EnvironmentProvider {
@Nullable
String getenv(String env);
}

static class DirectPathEnvironmentProvider implements EnvironmentProvider {
private static DirectPathEnvironmentProvider provider;

private DirectPathEnvironmentProvider() {}

public static DirectPathEnvironmentProvider getInstance() {
if (provider == null) {
provider = new DirectPathEnvironmentProvider();
}
return provider;
}

@Override
public String getenv(String env) {
return System.getenv(env);
}
}
}
Expand Up @@ -29,15 +29,13 @@
*/
package com.google.api.gax.grpc;

import static com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.DIRECT_PATH_ENV_VAR;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.EnvironmentProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.CloudShellCredentials;
Expand Down Expand Up @@ -215,9 +213,6 @@ public void testChannelConfigurator() throws IOException {

@Test
public void testWithGCECredentials() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn("localhost");

ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

Expand All @@ -231,7 +226,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand All @@ -247,9 +242,6 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

@Test
public void testWithNonGCECredentials() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn("localhost");

ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

Expand All @@ -264,7 +256,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand All @@ -280,24 +272,21 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

@Test
public void testWithDirectPathDisabled() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn("otherhost");

ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator =
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
// Clients without DirectPath environment variable will not attempt DirectPath
// Clients without setting attemptDirectPath flag will not attempt DirectPath
assertThat(channelBuilder instanceof ComputeEngineChannelBuilder).isFalse();
return channelBuilder;
}
};

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setAttemptDirectPath(false)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand All @@ -312,25 +301,21 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
}

@Test
public void testWithNoDirectPathEnvironment() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn(null);

public void testWithNoDirectPathFlagSet() throws IOException {
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator =
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
// Clients without DirectPath environment variable will not attempt DirectPath
// Clients without setting attemptDirectPath flag will not attempt DirectPath
assertThat(channelBuilder instanceof ComputeEngineChannelBuilder).isFalse();
return channelBuilder;
}
};

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand Down