Skip to content

Commit

Permalink
feat: allow DirectPath by default + update integration tests
Browse files Browse the repository at this point in the history
* DirectPath is now opt-out by setting google.bigtable.direct.path.allowed to false
* Integration tests can now verify that they are running over the desired mode by specifying "bigtable.connection-mode" system property
* Integration tests update their user agent to signal membership in allow/deny lists
* DirectPathFallbackIT now supports IPv4
  • Loading branch information
igorbernstein2 committed Jun 23, 2021
1 parent a25357f commit c935655
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ public interface ChannelConfigurator {
@InternalApi("For internal usage only")
public static final String BIGTABLE_DATA_HOST_DEFAULT = "bigtable.googleapis.com";

// Temporary DirectPath config
private static final String DIRECT_PATH_ENV_VAR = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH";
private static final String BIGTABLE_DIRECTPATH_DATA_HOST_DEFAULT =
"directpath-bigtable.googleapis.com";

/** For internal use only - public for technical reasons. */
@InternalApi("For internal usage only")
public static final String BIGTABLE_BATCH_DATA_HOST_DEFAULT = "batch-bigtable.googleapis.com";
Expand Down Expand Up @@ -104,27 +99,6 @@ public static BigtableOptions getDefaultOptions() {
return builder().build();
}

/**
* Checks if DirectPath is enabled via an environment variable.
*
* <p>For internal use only - public for technical reasons.
*/
@InternalApi("For internal use only")
public static boolean isDirectPathEnabled() {
String whiteList = MoreObjects.firstNonNull(System.getenv(DIRECT_PATH_ENV_VAR), "").trim();

if (whiteList.isEmpty()) {
return false;
}

for (String entry : whiteList.split(",")) {
if (BIGTABLE_DATA_HOST_DEFAULT.contains(entry)) {
return true;
}
}
return false;
}

/** Create a new instance of the {@link Builder}. */
public static Builder builder() {
return new Builder();
Expand All @@ -141,10 +115,7 @@ public Builder() {
options.appProfileId = BIGTABLE_APP_PROFILE_DEFAULT;

// Optional configuration for hosts - useful for the Bigtable team, more than anything else.
options.dataHost =
isDirectPathEnabled()
? BIGTABLE_DIRECTPATH_DATA_HOST_DEFAULT
: BIGTABLE_DATA_HOST_DEFAULT;
options.dataHost = BIGTABLE_DATA_HOST_DEFAULT;
options.adminHost = BIGTABLE_ADMIN_HOST_DEFAULT;
options.port = BIGTABLE_PORT_DEFAULT;

Expand Down Expand Up @@ -212,6 +183,17 @@ public Builder setDataChannelCount(int dataChannelCount) {
return this;
}

public Builder disableDirectPath() {
options.directPathAllowed = false;
return this;
}

/** For internal use only - public for technical reasons. */
@InternalApi("Visible for test only")
public ChannelConfigurator getChannelConfigurator() {
return this.options.channelConfigurator;
}

/** For internal use only - public for technical reasons. */
@InternalApi("Visible for test only")
public Builder setChannelConfigurator(ChannelConfigurator configurator) {
Expand Down Expand Up @@ -372,6 +354,7 @@ public BigtableOptions build() {
private String adminHost;
private String dataHost;
private int port;
private boolean directPathAllowed;
private String projectId;
private String instanceId;
private String appProfileId = BIGTABLE_APP_PROFILE_DEFAULT;
Expand Down Expand Up @@ -448,6 +431,11 @@ public int getPort() {
return port;
}

@BetaApi("surface for DirectPath is still unstable and may change in the future")
public boolean isDirectPathAllowed() {
return directPathAllowed;
}

/**
* Get the credential this object was constructed with. May be null.
*
Expand Down Expand Up @@ -562,6 +550,7 @@ public boolean equals(Object obj) {
&& (dataChannelCount == other.dataChannelCount)
&& (usePlaintextNegotiation == other.usePlaintextNegotiation)
&& (useCachedDataPool == other.useCachedDataPool)
&& (directPathAllowed == other.directPathAllowed)
&& Objects.equals(adminHost, other.adminHost)
&& Objects.equals(dataHost, other.dataHost)
&& Objects.equals(projectId, other.projectId)
Expand All @@ -585,6 +574,7 @@ public String toString() {
.omitNullValues()
.add("dataHost", dataHost)
.add("adminHost", adminHost)
.add("directPathEnabled", directPathAllowed)
.add("port", port)
.add("projectId", projectId)
.add("instanceId", instanceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public BigtableClusterUtilities(final BigtableOptions options)

ClientInterceptor[] interceptors =
BigtableSession.createAdminApiInterceptors(options).toArray(new ClientInterceptor[0]);
channel = BigtableSession.createNettyChannel(options.getAdminHost(), options, interceptors);
channel =
BigtableSession.createNettyChannel(options.getAdminHost(), options, false, interceptors);
client = new BigtableInstanceGrpcClient(channel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.google.cloud.bigtable.grpc.io.WatchdogInterceptor;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics.MetricLevel;
import com.google.cloud.bigtable.util.DirectPathUtil;
import com.google.cloud.bigtable.util.ReferenceCountedHashMap;
import com.google.cloud.bigtable.util.ReferenceCountedHashMap.Callable;
import com.google.cloud.bigtable.util.ThreadUtil;
Expand Down Expand Up @@ -134,6 +135,9 @@ public void call(ClientContext context) {
// Use this conservative values for timeout (10s)
static final long CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS = 10;

static final long DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS = 3600;
static final long DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS = 20;

@VisibleForTesting
static final String PROJECT_ID_EMPTY_OR_NULL = "ProjectId must not be empty or null.";

Expand Down Expand Up @@ -210,7 +214,6 @@ private static synchronized ResourceLimiter initializeResourceLimiter(BigtableOp

private final BigtableOptions options;
private final List<ManagedChannel> managedChannels;
@Deprecated private final List<ClientInterceptor> dataChannelInterceptors;

private final BigtableDataClient dataClient;
private final RequestContext dataRequestContext;
Expand Down Expand Up @@ -311,31 +314,33 @@ public BigtableSession(BigtableOptions opts) throws IOException {
this.dataClient = null;
this.throttlingDataClient = null;
this.dataRequestContext = null;
this.dataChannelInterceptors = null;
} else {
boolean useDirectPath =
options.isDirectPathAllowed()
&& DirectPathUtil.shouldAttemptDirectPath(
options.getDataHost(), options.getPort(), options.getCredentialOptions());

// Get a raw data channel pool - depending on the settings, this channel can either be
// cached/shared or it can specific to this session. If it's specific to this session,
// it will be added to managedChannels and cleaned up when this session is closed.
ManagedChannel rawDataChannelPool;
if (options.useCachedChannel()) {
synchronized (BigtableSession.class) {
String key = String.format("%s:%d", options.getDataHost(), options.getPort());
String key =
String.format("%s:%s:%d", useDirectPath, options.getDataHost(), options.getPort());
rawDataChannelPool = cachedDataChannelPools.get(key);
if (rawDataChannelPool == null) {
rawDataChannelPool = createRawDataChannelPool(options);
rawDataChannelPool = createRawDataChannelPool(options, useDirectPath);
cachedDataChannelPools.put(key, rawDataChannelPool);
}
}
} else {
rawDataChannelPool = createRawDataChannelPool(options);
rawDataChannelPool = createRawDataChannelPool(options, useDirectPath);
managedChannels.add(rawDataChannelPool);
}

// TODO: stop saving the data channel interceptors as instance variables, this is here only to
// support deprecated methods
dataChannelInterceptors = createDataApiInterceptors(options);
Channel dataChannel =
ClientInterceptors.intercept(rawDataChannelPool, dataChannelInterceptors);
ClientInterceptors.intercept(rawDataChannelPool, createDataApiInterceptors(options, useDirectPath));

this.dataRequestContext =
RequestContext.create(
Expand Down Expand Up @@ -366,7 +371,7 @@ public BigtableSession(BigtableOptions opts) throws IOException {
new BigtableDataGrpcClient(asyncDataChannel, sharedPools.getRetryExecutor(), options);
throttlingDataClient.setDeadlineGeneratorFactory(callOptionsFactory);

ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options);
ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options, false);
managedChannels.add(rawAdminChannel);

Channel adminChannel =
Expand Down Expand Up @@ -414,7 +419,7 @@ static List<ClientInterceptor> createAdminApiInterceptors(BigtableOptions option
return interceptors.build();
}

private List<ClientInterceptor> createDataApiInterceptors(BigtableOptions options)
private List<ClientInterceptor> createDataApiInterceptors(BigtableOptions options, boolean useDirectPath)
throws IOException {
ImmutableList.Builder<ClientInterceptor> interceptors = ImmutableList.builder();

Expand All @@ -428,7 +433,7 @@ private List<ClientInterceptor> createDataApiInterceptors(BigtableOptions option

interceptors.add(setupWatchdog());

if (!BigtableOptions.isDirectPathEnabled()) {
if (!useDirectPath) {
ClientInterceptor authInterceptor = createAuthInterceptor(options);
if (authInterceptor != null) {
interceptors.add(authInterceptor);
Expand Down Expand Up @@ -486,36 +491,33 @@ private static ClientInterceptor createAuthInterceptor(BigtableOptions options)

// <editor-fold desc="Channel management">

private static ChannelPool createRawDataChannelPool(final BigtableOptions options)
throws IOException {
private static ChannelPool createRawDataChannelPool(
final BigtableOptions options, final boolean useDirectPath) throws IOException {
ChannelPool.ChannelFactory channelFactory =
new ChannelPool.ChannelFactory() {
@Override
public ManagedChannel create() throws IOException {
return createNettyChannel(options.getDataHost(), options);
return createNettyChannel(options.getDataHost(), options, useDirectPath);
}
};
return new ChannelPool(channelFactory, options.getChannelCount());
}
/** For internal use only - public for technical reasons. */
@InternalApi("For internal usage only")
public static ManagedChannel createNettyChannel(
String host, BigtableOptions options, ClientInterceptor... interceptors) throws SSLException {

// DirectPath is only supported for data currently
boolean isDirectPath = BigtableOptions.isDirectPathEnabled() && !host.contains("admin");
String host,
BigtableOptions options,
boolean useDirectPath,
ClientInterceptor... interceptors)
throws SSLException {

LOG.info("Creating new channel for %s", host);
if (LOG.getLog().isTraceEnabled()) {
LOG.trace(Throwables.getStackTraceAsString(new Throwable()));
}

ManagedChannelBuilder<?> builder;
if (isDirectPath) {
LOG.warn(
"Connecting to Bigtable using DirectPath."
+ " This is currently an experimental feature and should not be used in production.");

if (useDirectPath) {
builder = ComputeEngineChannelBuilder.forAddress(host, options.getPort());
// When channel pooling is enabled, force the pick_first grpclb strategy.
// This is necessary to avoid the multiplicative effect of creating channel pool with
Expand All @@ -535,13 +537,19 @@ public static ManagedChannel createNettyChannel(
ImmutableMap<String, Object> loadBalancingConfig =
ImmutableMap.<String, Object>of("loadBalancingConfig", ImmutableList.of(grpcLbPolicy));

builder.defaultServiceConfig(loadBalancingConfig);
builder
.defaultServiceConfig(loadBalancingConfig)
.keepAliveTime(DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} else {
builder = ManagedChannelBuilder.forAddress(host, options.getPort());

if (options.usePlaintextNegotiation()) {
builder.usePlaintext();
}
builder
.keepAliveTime(CHANNEL_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

if (options.getChannelConfigurator() != null) {
Expand All @@ -551,8 +559,6 @@ public static ManagedChannel createNettyChannel(
return builder
.idleTimeout(Long.MAX_VALUE, TimeUnit.SECONDS)
.maxInboundMessageSize(MAX_MESSAGE_SIZE)
.keepAliveTime(CHANNEL_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(CHANNEL_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
// Default behavior Do not use keepalive without any outstanding rpc calls as it can add a
// bunch of load.
.userAgent(BigtableVersionInfo.CORE_USER_AGENT + "," + options.getUserAgent())
Expand Down Expand Up @@ -705,7 +711,7 @@ public BigtableInstanceClient getInstanceAdminClient() throws IOException {
public static BigtableInstanceClient createInstanceClient(BigtableOptions options)
throws IOException, GeneralSecurityException {

ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options);
ManagedChannel rawAdminChannel = createNettyChannel(options.getAdminHost(), options, false);
Channel adminChannel =
ClientInterceptors.intercept(rawAdminChannel, createAdminApiInterceptors(options));
return new BigtableInstanceGrpcClient(adminChannel);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.util;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.CredentialOptions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.io.CharStreams;
import java.io.IOException;
import java.io.InputStreamReader;

@InternalApi("For internal use only")
public class DirectPathUtil {
private static final String GCE_PRODUCTION_NAME_PRIOR_2016 = "Google";
private static final String GCE_PRODUCTION_NAME_AFTER_2016 = "Google Compute Engine";

private static final Supplier<Boolean> isOnComputeEngine =
Suppliers.memoize(
new Supplier<Boolean>() {
@Override
public Boolean get() {
String osName = System.getProperty("os.name");
if ("Linux".equals(osName)) {
String cmd = "cat /sys/class/dmi/id/product_name";
try {
Process process = Runtime.getRuntime().exec(new String[] {"/bin/sh", "-c", cmd});
process.waitFor();
String result =
CharStreams.toString(
new InputStreamReader(process.getInputStream(), "UTF-8"));
return result.contains(GCE_PRODUCTION_NAME_PRIOR_2016)
|| result.contains(GCE_PRODUCTION_NAME_AFTER_2016);
} catch (IOException | InterruptedException e) {
return false;
}
}
return false;
}
});

private DirectPathUtil() {}

public static boolean shouldAttemptDirectPath(
String endpoint, int port, CredentialOptions creds) {
return BigtableOptions.BIGTABLE_DATA_HOST_DEFAULT.equals(endpoint)
&& BigtableOptions.BIGTABLE_PORT_DEFAULT == port
&& CredentialOptions.defaultCredentials() == creds
&& isOnComputeEngine.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.hbase.test_env;

import com.google.cloud.bigtable.hbase.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -81,6 +82,29 @@ protected void setup() throws IOException {
}
}

// Configure DirectPath settings:
// - when a required mode is specified, ensure that ip address match the mode
// - specify a user agent that will trigger deny/allow lists on the serverside
switch (getConnectionMode()) {
case DEFAULT:
// nothing special
break;
case REQUIRE_CFE:
configuration.set("google.bigtable.test.data.ip.regex", "^(?!2001:4860:8040|34\\.126)");
configuration.set("google.bigtable.custom.user.agent", "bigtable-directpath-disable");
break;
case REQUIRE_DIRECT_PATH:
configuration.set("google.bigtable.test.data.ip.regex", "^2001:4860:8040|34\\.126");
configuration.set("google.bigtable.custom.user.agent", "bigtable-directpath-enable");
break;
case REQUIRE_DIRECT_PATH_IPV4:
configuration.set("google.bigtable.test.data.ip.regex", "^34\\.126");
configuration.set("google.bigtable.custom.user.agent", "bigtable-directpath-enable");
break;
default:
throw new IllegalStateException("Unknown ConnectionMode: " + getConnectionMode());
}

// Garbage collect tables that previous runs failed to clean up
ListeningExecutorService executor = MoreExecutors.listeningDecorator(getExecutor());
try (Connection connection = ConnectionFactory.createConnection(configuration);
Expand Down

0 comments on commit c935655

Please sign in to comment.