Skip to content

Commit

Permalink
feat: add APIs to enable batch write flow control (#1730)
Browse files Browse the repository at this point in the history
* feat: add APIs to enable batch write flow control

* address comments
  • Loading branch information
mutianf committed May 1, 2023
1 parent ee0da11 commit b518d68
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
Expand Down Expand Up @@ -278,6 +279,15 @@ public Long getBatchMutationsTargetRpcLatencyMs() {
return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs();
}

/**
* Gets if flow control is enabled for {@link BigtableDataClient#newBulkMutationBatcher(String)}
* based on the load of the Bigtable server.
*/
@InternalApi("Intended for use by the Bigtable dataflow connectors only")
public boolean isBulkMutationFlowControlEnabled() {
return stubSettings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled();
}

/** Returns the underlying RPC settings. */
public EnhancedBigtableStubSettings getStubSettings() {
return stubSettings;
Expand Down Expand Up @@ -505,6 +515,28 @@ public Long getTargetRpcLatencyMsForBatchMutation() {
return stubSettings.bulkMutateRowsSettings().getTargetRpcLatencyMs();
}

/**
* Configure flow control for {@link BigtableDataClient#newBulkMutationBatcher(String)} based on
* the current load on the Bigtable cluster.
*
* <p>This is different from the {@link FlowController} that's always enabled on batch reads and
* batch writes, which limits the number of outstanding requests to the Bigtable server.
*/
@InternalApi("Intended for use by the Bigtable dataflow connectors only")
public Builder setBulkMutationFlowControl(boolean isEnableFlowControl) {
stubSettings.bulkMutateRowsSettings().setServerInitiatedFlowControl(isEnableFlowControl);
return this;
}

/**
* Gets if flow control is enabled for {@link BigtableDataClient#newBulkMutationBatcher(String)}
* based on the load of the Bigtable server.
*/
@InternalApi("Intended for use by the Bigtable dataflow connectors only")
public boolean isBulkMutationFlowControlEnabled() {
return stubSettings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled();
}

/**
* Returns the underlying settings for making RPC calls. The settings should be changed with
* care.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.BatchingCallSettings;
import com.google.api.gax.batching.BatchingDescriptor;
import com.google.api.gax.batching.BatchingSettings;
Expand Down Expand Up @@ -66,6 +67,8 @@ public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMu
private final Long targetRpcLatencyMs;
private final DynamicFlowControlSettings dynamicFlowControlSettings;

private final boolean isServerInitiatedFlowControlEnabled;

private BigtableBatchingCallSettings(Builder builder) {
super(builder);
batchingCallSettings =
Expand All @@ -77,6 +80,7 @@ private BigtableBatchingCallSettings(Builder builder) {
this.isLatencyBasedThrottlingEnabled = builder.isLatencyBasedThrottlingEnabled;
this.targetRpcLatencyMs = builder.targetRpcLatencyMs;
this.dynamicFlowControlSettings = builder.dynamicFlowControlSettings;
this.isServerInitiatedFlowControlEnabled = builder.isServerInitiatedFlowControlEnabled;
}

/** Returns batching settings which contains multiple batch threshold levels. */
Expand Down Expand Up @@ -109,6 +113,12 @@ DynamicFlowControlSettings getDynamicFlowControlSettings() {
return dynamicFlowControlSettings;
}

/** Gets if flow control is enabled. */
@InternalApi("Intended for use by the Bigtable dataflow connectors only")
public boolean isServerInitiatedFlowControlEnabled() {
return isServerInitiatedFlowControlEnabled;
}

static Builder newBuilder(
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
return new Builder(batchingDescriptor);
Expand All @@ -130,6 +140,7 @@ public String toString() {
.add("isLatencyBasedThrottlingEnabled", isLatencyBasedThrottlingEnabled)
.add("targetRpcLatency", targetRpcLatencyMs)
.add("dynamicFlowControlSettings", dynamicFlowControlSettings)
.add("isServerInitiatedFlowControlEnabled", isServerInitiatedFlowControlEnabled)
.toString();
}

Expand All @@ -145,6 +156,8 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void
private Long targetRpcLatencyMs;
private DynamicFlowControlSettings dynamicFlowControlSettings;

private boolean isServerInitiatedFlowControlEnabled;

private Builder(
@Nonnull
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
Expand All @@ -159,6 +172,7 @@ private Builder(@Nonnull BigtableBatchingCallSettings settings) {
this.isLatencyBasedThrottlingEnabled = settings.isLatencyBasedThrottlingEnabled();
this.targetRpcLatencyMs = settings.getTargetRpcLatencyMs();
this.dynamicFlowControlSettings = settings.getDynamicFlowControlSettings();
this.isServerInitiatedFlowControlEnabled = settings.isServerInitiatedFlowControlEnabled();
}

/** Sets the batching settings with various thresholds. */
Expand Down Expand Up @@ -263,6 +277,19 @@ DynamicFlowControlSettings getDynamicFlowControlSettings() {
return this.dynamicFlowControlSettings;
}

/** Configure flow control based on the current load of the Bigtable server. */
@InternalApi("Intended for use by the Bigtable dataflow connectors only")
public Builder setServerInitiatedFlowControl(boolean isEnable) {
this.isServerInitiatedFlowControlEnabled = isEnable;
return this;
}

/** Gets if flow control is enabled based on the load of the Bigtable server. */
@InternalApi("Intended for use by the Bigtable dataflow connectors only")
public boolean isServerInitiatedFlowControlEnabled() {
return this.isServerInitiatedFlowControlEnabled;
}

/** Builds the {@link BigtableBatchingCallSettings} object with provided configuration. */
@Override
public BigtableBatchingCallSettings build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,15 +733,19 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withStatsHeaders =
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable =
new StatsHeadersServerStreamingCallable<>(base);

if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
callable = new RateLimitingServerStreamingCallable(callable);
}

// Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
// and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> convertException =
new ConvertExceptionCallable<>(withStatsHeaders);
new ConvertExceptionCallable<>(callable);

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void testToString() {
// disable channel priming so we won't need authentication
// for sending the prime request since we're only testing the settings.
.setRefreshingChannel(false)
.setBulkMutationFlowControl(true)
.build();
EnhancedBigtableStubSettings stubSettings = settings.getStubSettings();
assertThat(settings.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ public void testBuilder() {
assertThat(settings.getDynamicFlowControlSettings()).isNotNull();
verifyFlowControlSettingWhenLatencyBasedThrottlingDisabled(
settings.getDynamicFlowControlSettings());

builder.setServerInitiatedFlowControl(true);
settings = builder.build();
assertThat(settings.isServerInitiatedFlowControlEnabled()).isTrue();

builder.setServerInitiatedFlowControl(false);
settings = builder.build();
assertThat(settings.isServerInitiatedFlowControlEnabled()).isFalse();
}

@Test
Expand All @@ -108,7 +116,8 @@ public void testBuilderFromSettings() {
.setBatchingSettings(BATCHING_SETTINGS)
.setRetryableCodes(StatusCode.Code.UNAVAILABLE, StatusCode.Code.UNAUTHENTICATED)
.setRetrySettings(retrySettings)
.enableLatencyBasedThrottling(10L);
.enableLatencyBasedThrottling(10L)
.setServerInitiatedFlowControl(true);

BigtableBatchingCallSettings settings = builder.build();
BigtableBatchingCallSettings.Builder newBuilder = settings.toBuilder();
Expand All @@ -122,6 +131,7 @@ public void testBuilderFromSettings() {
assertThat(newBuilder.getDynamicFlowControlSettings()).isNotNull();
verifyFlowControlSettingWhenLatencyBasedThrottlingEnabled(
newBuilder.getDynamicFlowControlSettings());
assertThat(newBuilder.isServerInitiatedFlowControlEnabled()).isTrue();
}

@Test
Expand Down

0 comments on commit b518d68

Please sign in to comment.