Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow DirectPath by default + update integration tests #3031

Merged
merged 10 commits into from
Jun 30, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ public interface ChannelConfigurator {
@InternalApi("For internal usage only")
public static final String BIGTABLE_DATA_HOST_DEFAULT = "bigtable.googleapis.com";

// Temporary DirectPath config
private static final String DIRECT_PATH_ENV_VAR = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH";
private static final String BIGTABLE_DIRECTPATH_DATA_HOST_DEFAULT =
"directpath-bigtable.googleapis.com";

/** For internal use only - public for technical reasons. */
@InternalApi("For internal usage only")
public static final String BIGTABLE_BATCH_DATA_HOST_DEFAULT = "batch-bigtable.googleapis.com";
Expand Down Expand Up @@ -104,27 +99,6 @@ public static BigtableOptions getDefaultOptions() {
return builder().build();
}

/**
* Checks if DirectPath is enabled via an environment variable.
*
* <p>For internal use only - public for technical reasons.
*/
@InternalApi("For internal use only")
public static boolean isDirectPathEnabled() {
String whiteList = MoreObjects.firstNonNull(System.getenv(DIRECT_PATH_ENV_VAR), "").trim();

if (whiteList.isEmpty()) {
return false;
}

for (String entry : whiteList.split(",")) {
if (BIGTABLE_DATA_HOST_DEFAULT.contains(entry)) {
return true;
}
}
return false;
}

/** Create a new instance of the {@link Builder}. */
public static Builder builder() {
return new Builder();
Expand All @@ -141,12 +115,10 @@ public Builder() {
options.appProfileId = BIGTABLE_APP_PROFILE_DEFAULT;

// Optional configuration for hosts - useful for the Bigtable team, more than anything else.
options.dataHost =
isDirectPathEnabled()
? BIGTABLE_DIRECTPATH_DATA_HOST_DEFAULT
: BIGTABLE_DATA_HOST_DEFAULT;
options.dataHost = BIGTABLE_DATA_HOST_DEFAULT;
options.adminHost = BIGTABLE_ADMIN_HOST_DEFAULT;
options.port = BIGTABLE_PORT_DEFAULT;
options.directPathAllowed = true;

options.dataChannelCount = BIGTABLE_DATA_CHANNEL_COUNT_DEFAULT;
options.usePlaintextNegotiation = false;
Expand Down Expand Up @@ -212,6 +184,17 @@ public Builder setDataChannelCount(int dataChannelCount) {
return this;
}

public Builder disableDirectPath() {
options.directPathAllowed = false;
return this;
}

/** For internal use only - public for technical reasons. */
@InternalApi("Visible for test only")
public ChannelConfigurator getChannelConfigurator() {
return this.options.channelConfigurator;
}

/** For internal use only - public for technical reasons. */
@InternalApi("Visible for test only")
public Builder setChannelConfigurator(ChannelConfigurator configurator) {
Expand Down Expand Up @@ -372,6 +355,7 @@ public BigtableOptions build() {
private String adminHost;
private String dataHost;
private int port;
private boolean directPathAllowed;
private String projectId;
private String instanceId;
private String appProfileId = BIGTABLE_APP_PROFILE_DEFAULT;
Expand Down Expand Up @@ -448,6 +432,11 @@ public int getPort() {
return port;
}

@BetaApi("surface for DirectPath is still unstable and may change in the future")
public boolean isDirectPathAllowed() {
return directPathAllowed;
}

/**
* Get the credential this object was constructed with. May be null.
*
Expand Down Expand Up @@ -562,6 +551,7 @@ public boolean equals(Object obj) {
&& (dataChannelCount == other.dataChannelCount)
&& (usePlaintextNegotiation == other.usePlaintextNegotiation)
&& (useCachedDataPool == other.useCachedDataPool)
&& (directPathAllowed == other.directPathAllowed)
&& Objects.equals(adminHost, other.adminHost)
&& Objects.equals(dataHost, other.dataHost)
&& Objects.equals(projectId, other.projectId)
Expand All @@ -585,6 +575,7 @@ public String toString() {
.omitNullValues()
.add("dataHost", dataHost)
.add("adminHost", adminHost)
.add("directPathEnabled", directPathAllowed)
.add("port", port)
.add("projectId", projectId)
.add("instanceId", instanceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public BigtableClusterUtilities(final BigtableOptions options)

ClientInterceptor[] interceptors =
BigtableSession.createAdminApiInterceptors(options).toArray(new ClientInterceptor[0]);
channel = BigtableSession.createNettyChannel(options.getAdminHost(), options, interceptors);
channel =
BigtableSession.createNettyChannel(options.getAdminHost(), options, false, interceptors);
client = new BigtableInstanceGrpcClient(channel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.google.cloud.bigtable.grpc.io.WatchdogInterceptor;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics.MetricLevel;
import com.google.cloud.bigtable.util.DirectPathUtil;
import com.google.cloud.bigtable.util.ReferenceCountedHashMap;
import com.google.cloud.bigtable.util.ReferenceCountedHashMap.Callable;
import com.google.cloud.bigtable.util.ThreadUtil;
Expand Down Expand Up @@ -134,6 +135,9 @@ public void call(ClientContext context) {
// Use this conservative values for timeout (10s)
static final long CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 10;

static final long DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS = 3600;
static final long DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS = 20;

@VisibleForTesting
static final String PROJECT_ID_EMPTY_OR_NULL = "ProjectId must not be empty or null.";

Expand Down Expand Up @@ -210,7 +214,6 @@ private static synchronized ResourceLimiter initializeResourceLimiter(BigtableOp

private final BigtableOptions options;
private final List<ManagedChannel> managedChannels;
@Deprecated private final List<ClientInterceptor> dataChannelInterceptors;

private final BigtableDataClient dataClient;
private final RequestContext dataRequestContext;
Expand Down Expand Up @@ -311,31 +314,34 @@ public BigtableSession(BigtableOptions opts) throws IOException {
this.dataClient = null;
this.throttlingDataClient = null;
this.dataRequestContext = null;
this.dataChannelInterceptors = null;
} else {
boolean useDirectPath =
options.isDirectPathAllowed()
&& DirectPathUtil.shouldAttemptDirectPath(
options.getDataHost(), options.getPort(), options.getCredentialOptions());

// Get a raw data channel pool - depending on the settings, this channel can either be
// cached/shared or it can specific to this session. If it's specific to this session,
// it will be added to managedChannels and cleaned up when this session is closed.
ManagedChannel rawDataChannelPool;
if (options.useCachedChannel()) {
synchronized (BigtableSession.class) {
String key = String.format("%s:%d", options.getDataHost(), options.getPort());
String key =
String.format("%s:%s:%d", useDirectPath, options.getDataHost(), options.getPort());
rawDataChannelPool = cachedDataChannelPools.get(key);
if (rawDataChannelPool == null) {
rawDataChannelPool = createRawDataChannelPool(options);
rawDataChannelPool = createRawDataChannelPool(options, useDirectPath);
cachedDataChannelPools.put(key, rawDataChannelPool);
}
}
} else {
rawDataChannelPool = createRawDataChannelPool(options);
rawDataChannelPool = createRawDataChannelPool(options, useDirectPath);
managedChannels.add(rawDataChannelPool);
}

// TODO: stop saving the data channel interceptors as instance variables, this is here only to
// support deprecated methods
dataChannelInterceptors = createDataApiInterceptors(options);
Channel dataChannel =
ClientInterceptors.intercept(rawDataChannelPool, dataChannelInterceptors);
ClientInterceptors.intercept(
rawDataChannelPool, createDataApiInterceptors(options, useDirectPath));

this.dataRequestContext =
RequestContext.create(
Expand Down Expand Up @@ -366,7 +372,7 @@ public BigtableSession(BigtableOptions opts) throws IOException {
new BigtableDataGrpcClient(asyncDataChannel, sharedPools.getRetryExecutor(), options);
throttlingDataClient.setDeadlineGeneratorFactory(callOptionsFactory);

ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options);
ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options, false);
managedChannels.add(rawAdminChannel);

Channel adminChannel =
Expand Down Expand Up @@ -414,8 +420,8 @@ static List<ClientInterceptor> createAdminApiInterceptors(BigtableOptions option
return interceptors.build();
}

private List<ClientInterceptor> createDataApiInterceptors(BigtableOptions options)
throws IOException {
private List<ClientInterceptor> createDataApiInterceptors(
BigtableOptions options, boolean useDirectPath) throws IOException {
ImmutableList.Builder<ClientInterceptor> interceptors = ImmutableList.builder();

// TODO: instanceName should never be null
Expand All @@ -428,7 +434,7 @@ private List<ClientInterceptor> createDataApiInterceptors(BigtableOptions option

interceptors.add(setupWatchdog());

if (!BigtableOptions.isDirectPathEnabled()) {
if (!useDirectPath) {
ClientInterceptor authInterceptor = createAuthInterceptor(options);
if (authInterceptor != null) {
interceptors.add(authInterceptor);
Expand Down Expand Up @@ -486,36 +492,33 @@ private static ClientInterceptor createAuthInterceptor(BigtableOptions options)

// <editor-fold desc="Channel management">

private static ChannelPool createRawDataChannelPool(final BigtableOptions options)
throws IOException {
private static ChannelPool createRawDataChannelPool(
final BigtableOptions options, final boolean useDirectPath) throws IOException {
ChannelPool.ChannelFactory channelFactory =
new ChannelPool.ChannelFactory() {
@Override
public ManagedChannel create() throws IOException {
return createNettyChannel(options.getDataHost(), options);
return createNettyChannel(options.getDataHost(), options, useDirectPath);
}
};
return new ChannelPool(channelFactory, options.getChannelCount());
}
/** For internal use only - public for technical reasons. */
@InternalApi("For internal usage only")
public static ManagedChannel createNettyChannel(
String host, BigtableOptions options, ClientInterceptor... interceptors) throws SSLException {

// DirectPath is only supported for data currently
boolean isDirectPath = BigtableOptions.isDirectPathEnabled() && !host.contains("admin");
String host,
BigtableOptions options,
boolean useDirectPath,
ClientInterceptor... interceptors)
throws SSLException {

LOG.info("Creating new channel for %s", host);
if (LOG.getLog().isTraceEnabled()) {
LOG.trace(Throwables.getStackTraceAsString(new Throwable()));
}

ManagedChannelBuilder<?> builder;
if (isDirectPath) {
LOG.warn(
"Connecting to Bigtable using DirectPath."
+ " This is currently an experimental feature and should not be used in production.");

if (useDirectPath) {
builder = ComputeEngineChannelBuilder.forAddress(host, options.getPort());
// When channel pooling is enabled, force the pick_first grpclb strategy.
// This is necessary to avoid the multiplicative effect of creating channel pool with
Expand All @@ -535,24 +538,29 @@ public static ManagedChannel createNettyChannel(
ImmutableMap<String, Object> loadBalancingConfig =
ImmutableMap.<String, Object>of("loadBalancingConfig", ImmutableList.of(grpcLbPolicy));

builder.defaultServiceConfig(loadBalancingConfig);
builder
.defaultServiceConfig(loadBalancingConfig)
.keepAliveTime(DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} else {
builder = ManagedChannelBuilder.forAddress(host, options.getPort());

if (options.usePlaintextNegotiation()) {
builder.usePlaintext();
}
builder
.keepAliveTime(CHANNEL_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

if (options.getChannelConfigurator() != null) {
builder = options.getChannelConfigurator().configureChannel(builder, host);
}

return builder
.disableServiceConfigLookUp()
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
.idleTimeout(Long.MAX_VALUE, TimeUnit.SECONDS)
.maxInboundMessageSize(MAX_MESSAGE_SIZE)
.keepAliveTime(CHANNEL_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
// Default behavior Do not use keepalive without any outstanding rpc calls as it can add a
// bunch of load.
.userAgent(BigtableVersionInfo.CORE_USER_AGENT + "," + options.getUserAgent())
Expand Down Expand Up @@ -705,7 +713,7 @@ public BigtableInstanceClient getInstanceAdminClient() throws IOException {
public static BigtableInstanceClient createInstanceClient(BigtableOptions options)
throws IOException, GeneralSecurityException {

ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options);
ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options, false);
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
Channel adminChannel =
ClientInterceptors.intercept(rawAdminChannel, createAdminApiInterceptors(options));
return new BigtableInstanceGrpcClient(adminChannel);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.util;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.CredentialOptions;
import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

@InternalApi("For internal use only")
public class DirectPathUtil {
private static final String GCE_PRODUCTION_NAME_PRIOR_2016 = "Google";
private static final String GCE_PRODUCTION_NAME_AFTER_2016 = "Google Compute Engine";

private static final Supplier<Boolean> isOnComputeEngine =
Suppliers.memoize(
new Supplier<Boolean>() {
@Override
public Boolean get() {
String osName = System.getProperty("os.name");
if ("Linux".equals(osName)) {
try {
String result =
Files.asCharSource(
new File("/sys/class/dmi/id/product_name"), StandardCharsets.UTF_8)
.readFirstLine();

return result != null
&& (result.contains(GCE_PRODUCTION_NAME_PRIOR_2016)
|| result.contains(GCE_PRODUCTION_NAME_AFTER_2016));
} catch (IOException e) {
return false;
}
}
return false;
}
});

private DirectPathUtil() {}

public static boolean shouldAttemptDirectPath(
String endpoint, int port, CredentialOptions creds) {
return BigtableOptions.BIGTABLE_DATA_HOST_DEFAULT.equals(endpoint)
&& BigtableOptions.BIGTABLE_PORT_DEFAULT == port
&& CredentialType.DefaultCredentials == creds.getCredentialType()
&& isOnComputeEngine.get();
}
}