Skip to content

Commit

Permalink
feat: handle retry info so client respect the delay server sets (#2026)
Browse files Browse the repository at this point in the history
* feat: add a flag to add / remove routing cookie from callable chain
  • Loading branch information
mutianf committed Dec 19, 2023
1 parent 201e631 commit f1b7fc7
Show file tree
Hide file tree
Showing 9 changed files with 780 additions and 45 deletions.
5 changes: 4 additions & 1 deletion google-cloud-bigtable/pom.xml
Expand Up @@ -161,7 +161,10 @@
<artifactId>grpc-alts</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Runtime dependencies for credentials -->
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</dependency><!-- Runtime dependencies for credentials -->
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
Expand Down Expand Up @@ -108,6 +109,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -762,11 +764,19 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

BasicResultRetryAlgorithm<Void> resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
}

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
resultRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));

RetryingExecutorWithContext<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

Expand Down Expand Up @@ -1056,8 +1066,14 @@ public Map<String, String> extract(PingAndWarmRequest request) {

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, unaryCallSettings, clientContext);
UnaryCallable<RequestT, ResponseT> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
innerCallable, unaryCallSettings, clientContext);
} else {
retrying = Callables.retrying(innerCallable, unaryCallSettings, clientContext);
}
if (settings.getEnableRoutingCookie()) {
return new CookiesUnaryCallable<>(retrying);
}
Expand All @@ -1067,8 +1083,15 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {
ServerStreamingCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);

ServerStreamingCallable<RequestT, ResponseT> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
innerCallable, serverStreamingCallSettings, clientContext);
} else {
retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
}
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
Expand Down
Expand Up @@ -213,6 +213,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private ImmutableList<String> primedTableIds;
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRoutingCookie;
private final boolean enableRetryInfo;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -255,6 +256,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
primedTableIds = builder.primedTableIds;
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;
enableRetryInfo = builder.enableRetryInfo;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -325,6 +327,15 @@ public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/**
* Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on server
* returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
@BetaApi("RetryInfo is not currently stable and may change in the future")
public boolean getEnableRetryInfo() {
return enableRetryInfo;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -608,6 +619,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private ImmutableList<String> primedTableIds;
private Map<String, String> jwtAudienceMapping;
private boolean enableRoutingCookie;
private boolean enableRetryInfo;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -641,6 +653,7 @@ private Builder() {
jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;
this.enableRetryInfo = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
Expand Down Expand Up @@ -760,6 +773,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
primedTableIds = settings.primedTableIds;
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;
enableRetryInfo = settings.enableRetryInfo;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -927,6 +941,25 @@ public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/**
* Sets if RetryInfo is enabled. If true, client bases retry decision and back off time on
* server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
@BetaApi("RetryInfo is not currently stable and may change in the future")
public Builder setEnableRetryInfo(boolean enableRetryInfo) {
this.enableRetryInfo = enableRetryInfo;
return this;
}

/**
* Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on
* server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
@BetaApi("RetryInfo is not currently stable and may change in the future")
public boolean getEnableRetryInfo() {
return enableRetryInfo;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1054,6 +1087,7 @@ public String toString() {
.add("primedTableIds", primedTableIds)
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("enableRetryInfo", enableRetryInfo)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.gaxx.retrying.ApiExceptions;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -235,7 +236,8 @@ private void handleAttemptError(Throwable rpcError) {
FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (!failedMutation.getError().isRetryable()) {
if (!ApiExceptions.isRetryable2(failedMutation.getError())
&& !failedMutation.getError().isRetryable()) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;

// TODO: move this to gax later
@InternalApi
public class ApiExceptions {

private ApiExceptions() {}

// TODO: this should replace the existing ApiException#isRetryable() method,
// but that cant be done in bigtable, so this lives here for now.
public static boolean isRetryable2(Throwable e) {
if (RetryInfoRetryAlgorithm.extractRetryDelay(e) != null) {
return true;
}
return false;
}
}
Expand Up @@ -18,16 +18,13 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import java.util.Collection;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
Expand All @@ -48,23 +45,14 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(

UnaryCallSettings<?, ?> settings = callSettings;

if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<ResponseT>(),
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
ScheduledRetryingExecutor<ResponseT> executor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
return new RetryingCallable<>(
clientContext.getDefaultCallContext(), innerCallable, retryingExecutor);

return new RetryingCallable<>(clientContext.getDefaultCallContext(), innerCallable, executor);
}

public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> retrying(
Expand All @@ -73,18 +61,10 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
ClientContext clientContext) {

ServerStreamingCallSettings<RequestT, ResponseT> settings = callSettings;
if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand All @@ -93,11 +73,4 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
return new RetryingServerStreamingCallable<>(
innerCallable, retryingExecutor, settings.getResumptionStrategy());
}

private static boolean areRetriesDisabled(
Collection<StatusCode.Code> retryableCodes, RetrySettings retrySettings) {
return retrySettings.getMaxAttempts() == 1
|| retryableCodes.isEmpty()
|| (retrySettings.getMaxAttempts() == 0 && retrySettings.getTotalTimeout().isZero());
}
}

0 comments on commit f1b7fc7

Please sign in to comment.