Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix RetryInfo algorithm and tests #2041

Merged
merged 8 commits into from Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -156,4 +156,22 @@
<className>com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm</className>
<field>*</field>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable</className>
<method>*</method>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/MutateRowsException</className>
<method>*</method>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/MutateRowsException</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.StatusCode;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowsRequest;
Expand Down Expand Up @@ -53,16 +54,36 @@ public Object getTransportCode() {
* applications.
*/
@InternalApi
public MutateRowsException(
public static MutateRowsException create(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable);
ErrorDetails errorDetails = null;
if (rpcError instanceof ApiException) {
errorDetails = ((ApiException) rpcError).getErrorDetails();
}

return new MutateRowsException(rpcError, failedMutations, retryable, errorDetails);
}

private MutateRowsException(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable,
@Nullable ErrorDetails errorDetails) {
super(rpcError, LOCAL_STATUS, retryable, errorDetails);
Preconditions.checkNotNull(failedMutations);
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
this.failedMutations = failedMutations;
}

// TODO: remove this after we add a ctor in gax to pass in a Throwable, a message and error
// details.
@Override
public String getMessage() {
return "Some mutations failed to apply";
}

/**
* Retrieve all of the failed mutations. This list will contain failures for all of the mutations
* that have failed across all of the retry attempts so far.
Expand Down
Expand Up @@ -784,7 +784,8 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);
}

/**
Expand Down
Expand Up @@ -19,7 +19,9 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
Expand All @@ -31,7 +33,6 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.gaxx.retrying.ApiExceptions;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -111,6 +112,8 @@ public Object getTransportCode() {
@Nullable private List<Integer> originalIndexes;
@Nonnull private final Set<StatusCode.Code> retryableCodes;
@Nullable private final List<FailedMutation> permanentFailures;
@Nonnull private final RetryAlgorithm<MutateRowsRequest> retryAlgorithm;
@Nonnull private TimedAttemptSettings attemptSettings;

// Parent controller
private RetryingFuture<Void> externalFuture;
Expand Down Expand Up @@ -138,11 +141,14 @@ public List<MutateRowsResponse> apply(Throwable throwable) {
@Nonnull UnaryCallable<MutateRowsRequest, List<MutateRowsResponse>> innerCallable,
@Nonnull MutateRowsRequest originalRequest,
@Nonnull ApiCallContext callContext,
@Nonnull Set<StatusCode.Code> retryableCodes) {
@Nonnull Set<StatusCode.Code> retryableCodes,
@Nonnull RetryAlgorithm<MutateRowsRequest> retryAlgorithm) {
this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable");
this.currentRequest = Preconditions.checkNotNull(originalRequest, "currentRequest");
this.callContext = Preconditions.checkNotNull(callContext, "callContext");
this.retryableCodes = Preconditions.checkNotNull(retryableCodes, "retryableCodes");
this.retryAlgorithm = retryAlgorithm;
this.attemptSettings = retryAlgorithm.createFirstAttempt();

permanentFailures = Lists.newArrayList();
}
Expand Down Expand Up @@ -230,14 +236,16 @@ private void handleAttemptError(Throwable rpcError) {
Builder builder = lastRequest.toBuilder().clearEntries();
List<Integer> newOriginalIndexes = Lists.newArrayList();

TimedAttemptSettings nextAttemptSettings =
mutianf marked this conversation as resolved.
Show resolved Hide resolved
retryAlgorithm.createNextAttempt(null, entryError, null, attemptSettings);

for (int i = 0; i < currentRequest.getEntriesCount(); i++) {
int origIndex = getOriginalIndex(i);

FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (!ApiExceptions.isRetryable2(failedMutation.getError())
&& !failedMutation.getError().isRetryable()) {
if (!retryAlgorithm.shouldRetry(null, failedMutation.getError(), null, nextAttemptSettings)) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
Expand All @@ -249,16 +257,17 @@ private void handleAttemptError(Throwable rpcError) {

currentRequest = builder.build();
originalIndexes = newOriginalIndexes;
attemptSettings = nextAttemptSettings;

throw new MutateRowsException(rpcError, allFailures.build(), entryError.isRetryable());
throw MutateRowsException.create(rpcError, allFailures.build(), builder.getEntriesCount() > 0);
}

/**
* Handle entry level failures. All new response entries are inspected for failure. If any
* transient failures are found, their corresponding mutations are scheduled for the next RPC. The
* caller is notified of both new found errors and pre-existing permanent errors in the thrown
* {@link MutateRowsException}. If no errors exist, then the attempt future is successfully
* completed.
* completed. We don't currently handle RetryInfo on entry level failures.
*/
private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
List<FailedMutation> allFailures = Lists.newArrayList(permanentFailures);
Expand Down Expand Up @@ -319,7 +328,7 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
throw new MutateRowsException(null, allFailures, isRetryable);
throw MutateRowsException.create(null, allFailures, isRetryable);
}
}

Expand Down Expand Up @@ -354,10 +363,10 @@ private static ApiException createSyntheticErrorForRpcFailure(Throwable overallR
ApiException requestApiException = (ApiException) overallRequestError;

return ApiExceptionFactory.createException(
"Didn't receive a result for this mutation entry",
overallRequestError,
requestApiException.getStatusCode(),
requestApiException.isRetryable());
requestApiException.isRetryable(),
requestApiException.getErrorDetails());
}

return ApiExceptionFactory.createException(
Expand Down
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
Expand Down Expand Up @@ -44,23 +45,26 @@ public class MutateRowsRetryingCallable extends UnaryCallable<MutateRowsRequest,
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutorWithContext<Void> executor;
private final ImmutableSet<Code> retryCodes;
private final RetryAlgorithm retryAlgorithm;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutorWithContext<Void> executor,
@Nonnull Set<StatusCode.Code> retryCodes) {
@Nonnull Set<StatusCode.Code> retryCodes,
@Nonnull RetryAlgorithm retryAlgorithm) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
this.callable = Preconditions.checkNotNull(callable);
this.executor = Preconditions.checkNotNull(executor);
this.retryCodes = ImmutableSet.copyOf(retryCodes);
this.retryAlgorithm = retryAlgorithm;
}

@Override
public RetryingFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext inputContext) {
ApiCallContext context = callContextPrototype.nullToSelf(inputContext);
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes);
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes, retryAlgorithm);

RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
Expand Down

This file was deleted.

Expand Up @@ -20,12 +20,8 @@
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.threeten.bp.Duration;

Expand All @@ -37,10 +33,6 @@
@InternalApi
public class RetryInfoRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

@VisibleForTesting
public static final Metadata.Key<RetryInfo> RETRY_INFO_KEY =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
Expand All @@ -50,6 +42,7 @@ public TimedAttemptSettings createNextAttempt(
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setOverallAttemptCount(prevSettings.getAttemptCount() + 1)
.build();
}
return null;
Expand Down Expand Up @@ -93,17 +86,17 @@ static Duration extractRetryDelay(@Nullable Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
if (!(throwable instanceof ApiException)) {
return null;
}
RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY);
if (retryInfo == null) {
ApiException exception = (ApiException) throwable;
if (exception.getErrorDetails() == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
if (exception.getErrorDetails().getRetryInfo() == null) {
return null;
}
RetryInfo retryInfo = exception.getErrorDetails().getRetryInfo();
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
Expand Up @@ -15,13 +15,13 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm.RETRY_INFO_KEY;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.UnavailableException;
Expand Down Expand Up @@ -55,7 +55,9 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.protobuf.Any;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -77,6 +79,9 @@ public class RetryInfoTest {

@Rule public GrpcServerRule serverRule = new GrpcServerRule();

private static final Metadata.Key<byte[]> ERROR_DETAILS_KEY =
Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);

private FakeBigtableService service;
private BigtableDataClient client;
private BigtableDataSettings.Builder settings;
Expand Down Expand Up @@ -366,27 +371,37 @@ private void verifyRetryInfoCanBeDisabled(Runnable runnable) {
private void enqueueRetryableExceptionWithDelay(com.google.protobuf.Duration delay) {
Metadata trailers = new Metadata();
RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
trailers.put(RETRY_INFO_KEY, retryInfo);
ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build();
byte[] status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray();
trailers.put(ERROR_DETAILS_KEY, status);

ApiException exception =
new UnavailableException(
new StatusRuntimeException(Status.UNAVAILABLE, trailers),
GrpcStatusCode.of(Status.Code.UNAVAILABLE),
true);
true,
errorDetails);

service.expectations.add(exception);
}

private ApiException enqueueNonRetryableExceptionWithDelay(com.google.protobuf.Duration delay) {
Metadata trailers = new Metadata();
RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
trailers.put(RETRY_INFO_KEY, retryInfo);
ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build();
byte[] status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray();
trailers.put(ERROR_DETAILS_KEY, status);

ApiException exception =
new InternalException(
new StatusRuntimeException(Status.INTERNAL, trailers),
GrpcStatusCode.of(Status.Code.INTERNAL),
false);
false,
errorDetails);

service.expectations.add(exception);

Expand Down