Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Add setAttemptDirectPath(boolean) for InstantiatingGrpcChannelProvider.Builder #1015

Merged
merged 5 commits into from May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -89,6 +89,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
@Nullable private final Integer poolSize;
@Nullable private final Credentials credentials;
@Nullable private final ChannelPrimer channelPrimer;
@Nullable private final Boolean attemptDirectPath;

@Nullable
private final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
Expand All @@ -109,6 +110,7 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
this.channelConfigurator = builder.channelConfigurator;
this.credentials = builder.credentials;
this.channelPrimer = builder.channelPrimer;
this.attemptDirectPath = builder.attemptDirectPath;
}

@Override
Expand Down Expand Up @@ -211,9 +213,13 @@ public ManagedChannel createSingleChannel() throws IOException {
return GrpcTransportChannel.create(outerChannel);
}

// The environment variable is used during the rollout phase for directpath.
// This checker function will be removed once directpath is stable.
// TODO(weiranf): Use attemptDirectPath as the only indicator once setAttemptDirectPath is adapted
// and the env var is removed from client environment.
private boolean isDirectPathEnabled(String serviceAddress) {
if (attemptDirectPath != null) {
return attemptDirectPath;
}
// Only check DIRECT_PATH_ENV_VAR when attemptDirectPath is not set.
String whiteList = envProvider.getenv(DIRECT_PATH_ENV_VAR);
if (whiteList == null) return false;
for (String service : whiteList.split(",")) {
Expand All @@ -238,8 +244,7 @@ private ManagedChannel createSingleChannel() throws IOException {

ManagedChannelBuilder builder;

// TODO(weiranf): Add a new API in ComputeEngineCredentials to check whether it's using default
// service account.
// TODO(weiranf): Add API in ComputeEngineCredentials to check default service account.
if (isDirectPathEnabled(serviceAddress) && credentials instanceof ComputeEngineCredentials) {
builder = ComputeEngineChannelBuilder.forAddress(serviceAddress, port);
// Set default keepAliveTime and keepAliveTimeout when directpath environment is enabled.
Expand Down Expand Up @@ -363,6 +368,7 @@ public static final class Builder {
@Nullable private ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
@Nullable private Credentials credentials;
@Nullable private ChannelPrimer channelPrimer;
@Nullable private Boolean attemptDirectPath;

private Builder() {
processorCount = Runtime.getRuntime().availableProcessors();
Expand All @@ -385,6 +391,7 @@ private Builder(InstantiatingGrpcChannelProvider provider) {
this.channelConfigurator = provider.channelConfigurator;
this.credentials = provider.credentials;
this.channelPrimer = provider.channelPrimer;
this.attemptDirectPath = provider.attemptDirectPath;
}

/** Sets the number of available CPUs, used internally for testing. */
Expand All @@ -393,12 +400,6 @@ Builder setProcessorCount(int processorCount) {
return this;
}

/** Sets the environment variable provider used for testing. */
Builder setEnvironmentProvider(EnvironmentProvider envProvider) {
this.envProvider = envProvider;
return this;
}

/**
* Sets the ExecutorProvider for this TransportChannelProvider.
*
Expand Down Expand Up @@ -565,6 +566,13 @@ public Builder setChannelPrimer(ChannelPrimer channelPrimer) {
return this;
}

/** Whether attempt DirectPath. */
@InternalApi("For internal use by google-cloud-java clients only")
public Builder setAttemptDirectPath(boolean attemptDirectPath) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
this.attemptDirectPath = attemptDirectPath;
return this;
}

public InstantiatingGrpcChannelProvider build() {
return new InstantiatingGrpcChannelProvider(this);
}
Expand Down
Expand Up @@ -29,15 +29,13 @@
*/
package com.google.api.gax.grpc;

import static com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.DIRECT_PATH_ENV_VAR;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.EnvironmentProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.CloudShellCredentials;
Expand Down Expand Up @@ -215,9 +213,6 @@ public void testChannelConfigurator() throws IOException {

@Test
public void testWithGCECredentials() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn("localhost");

ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

Expand All @@ -231,7 +226,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand All @@ -247,9 +242,6 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

@Test
public void testWithNonGCECredentials() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn("localhost");

ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

Expand All @@ -264,7 +256,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setAttemptDirectPath(true)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand All @@ -280,24 +272,21 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {

@Test
public void testWithDirectPathDisabled() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn("otherhost");

ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator =
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
// Clients without DirectPath environment variable will not attempt DirectPath
// Clients without setting attemptDirectPath flag will not attempt DirectPath
assertThat(channelBuilder instanceof ComputeEngineChannelBuilder).isFalse();
return channelBuilder;
}
};

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setAttemptDirectPath(false)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand All @@ -312,25 +301,21 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
}

@Test
public void testWithNoDirectPathEnvironment() throws IOException {
EnvironmentProvider mockEnvProvider = Mockito.mock(EnvironmentProvider.class);
Mockito.when(mockEnvProvider.getenv(DIRECT_PATH_ENV_VAR)).thenReturn(null);

public void testWithNoDirectPathFlagSet() throws IOException {
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator =
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
// Clients without DirectPath environment variable will not attempt DirectPath
// Clients without setting attemptDirectPath flag will not attempt DirectPath
assertThat(channelBuilder instanceof ComputeEngineChannelBuilder).isFalse();
return channelBuilder;
}
};

TransportChannelProvider provider =
InstantiatingGrpcChannelProvider.newBuilder()
.setEnvironmentProvider(mockEnvProvider)
.setChannelConfigurator(channelConfigurator)
.build()
.withExecutor(executor)
Expand Down