Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix RetryInfo algorithm and tests #2041

Merged
merged 8 commits into from Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -156,4 +156,10 @@
<className>com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm</className>
<field>*</field>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>7004</differenceType>
<className> com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.StatusCode;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowsRequest;
Expand Down Expand Up @@ -56,13 +57,30 @@ public Object getTransportCode() {
public MutateRowsException(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable);
boolean retryable,
@Nullable ErrorDetails errorDetails) {
super(
new Throwable("Some mutations failed to apply", rpcError),
LOCAL_STATUS,
retryable,
errorDetails);
Preconditions.checkNotNull(failedMutations);
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
this.failedMutations = failedMutations;
}

/**
* This constructor is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public MutateRowsException(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
this(rpcError, failedMutations, retryable, null);
}

/**
* Retrieve all of the failed mutations. This list will contain failures for all of the mutations
* that have failed across all of the retry attempts so far.
Expand Down
Expand Up @@ -784,7 +784,8 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);
}

/**
Expand Down
Expand Up @@ -19,10 +19,13 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
Expand All @@ -31,7 +34,6 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.gaxx.retrying.ApiExceptions;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -111,6 +113,8 @@ public Object getTransportCode() {
@Nullable private List<Integer> originalIndexes;
@Nonnull private final Set<StatusCode.Code> retryableCodes;
@Nullable private final List<FailedMutation> permanentFailures;
@Nonnull private final RetryAlgorithm retryAlgorithm;
@Nonnull private final TimedAttemptSettings attemptSettings;

// Parent controller
private RetryingFuture<Void> externalFuture;
Expand Down Expand Up @@ -138,11 +142,14 @@ public List<MutateRowsResponse> apply(Throwable throwable) {
@Nonnull UnaryCallable<MutateRowsRequest, List<MutateRowsResponse>> innerCallable,
@Nonnull MutateRowsRequest originalRequest,
@Nonnull ApiCallContext callContext,
@Nonnull Set<StatusCode.Code> retryableCodes) {
@Nonnull Set<StatusCode.Code> retryableCodes,
@Nonnull RetryAlgorithm<MutateRowsRequest> retryAlgorithm) {
this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable");
this.currentRequest = Preconditions.checkNotNull(originalRequest, "currentRequest");
this.callContext = Preconditions.checkNotNull(callContext, "callContext");
this.retryableCodes = Preconditions.checkNotNull(retryableCodes, "retryableCodes");
this.retryAlgorithm = retryAlgorithm;
this.attemptSettings = retryAlgorithm.createFirstAttempt();

permanentFailures = Lists.newArrayList();
}
Expand Down Expand Up @@ -236,8 +243,9 @@ private void handleAttemptError(Throwable rpcError) {
FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (!ApiExceptions.isRetryable2(failedMutation.getError())
&& !failedMutation.getError().isRetryable()) {
TimedAttemptSettings nextAttemptSettings =
retryAlgorithm.createNextAttempt(null, failedMutation.getError(), null, attemptSettings);
if (!retryAlgorithm.shouldRetry(null, failedMutation.getError(), null, nextAttemptSettings)) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
Expand All @@ -250,7 +258,12 @@ private void handleAttemptError(Throwable rpcError) {
currentRequest = builder.build();
originalIndexes = newOriginalIndexes;

throw new MutateRowsException(rpcError, allFailures.build(), entryError.isRetryable());
ErrorDetails errorDetails = null;
if (rpcError instanceof ApiException) {
errorDetails = ((ApiException) rpcError).getErrorDetails();
}
throw new MutateRowsException(
rpcError, allFailures.build(), entryError.isRetryable(), errorDetails);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -268,6 +281,8 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
List<Integer> newOriginalIndexes = Lists.newArrayList();
boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()];

ErrorDetails errorDetails = null;
mutianf marked this conversation as resolved.
Show resolved Hide resolved

for (MutateRowsResponse response : responses) {
for (Entry entry : response.getEntriesList()) {
seenIndices[Ints.checkedCast(entry.getIndex())] = true;
Expand All @@ -283,13 +298,18 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

allFailures.add(failedMutation);

if (!failedMutation.getError().isRetryable()) {
TimedAttemptSettings nextAttemptSettings =
retryAlgorithm.createNextAttempt(
null, failedMutation.getError(), null, attemptSettings);
if (!retryAlgorithm.shouldRetry(
null, failedMutation.getError(), null, nextAttemptSettings)) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
// recording it's original index
newOriginalIndexes.add(origIndex);
builder.addEntries(lastRequest.getEntries((int) entry.getIndex()));
errorDetails = failedMutation.getError().getErrorDetails();
}
}
}
Expand Down Expand Up @@ -319,7 +339,7 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
throw new MutateRowsException(null, allFailures, isRetryable);
throw new MutateRowsException(null, allFailures, isRetryable, errorDetails);
}
}

Expand All @@ -338,10 +358,14 @@ private ApiException createEntryError(com.google.rpc.Status protoStatus) {

StatusCode gaxStatusCode = GrpcStatusCode.of(grpcStatus.getCode());

ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(protoStatus.getDetailsList()).build();

return ApiExceptionFactory.createException(
grpcStatus.asRuntimeException(),
gaxStatusCode,
retryableCodes.contains(gaxStatusCode.getCode()));
retryableCodes.contains(gaxStatusCode.getCode()),
errorDetails);
}

/**
Expand All @@ -354,10 +378,10 @@ private static ApiException createSyntheticErrorForRpcFailure(Throwable overallR
ApiException requestApiException = (ApiException) overallRequestError;

return ApiExceptionFactory.createException(
"Didn't receive a result for this mutation entry",
overallRequestError,
requestApiException.getStatusCode(),
requestApiException.isRetryable());
requestApiException.isRetryable(),
requestApiException.getErrorDetails());
}

return ApiExceptionFactory.createException(
Expand Down
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
Expand Down Expand Up @@ -44,23 +45,26 @@ public class MutateRowsRetryingCallable extends UnaryCallable<MutateRowsRequest,
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutorWithContext<Void> executor;
private final ImmutableSet<Code> retryCodes;
private final RetryAlgorithm retryAlgorithm;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutorWithContext<Void> executor,
@Nonnull Set<StatusCode.Code> retryCodes) {
@Nonnull Set<StatusCode.Code> retryCodes,
@Nonnull RetryAlgorithm retryAlgorithm) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
this.callable = Preconditions.checkNotNull(callable);
this.executor = Preconditions.checkNotNull(executor);
this.retryCodes = ImmutableSet.copyOf(retryCodes);
this.retryAlgorithm = retryAlgorithm;
}

@Override
public RetryingFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext inputContext) {
ApiCallContext context = callContextPrototype.nullToSelf(inputContext);
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes);
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes, retryAlgorithm);

RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
Expand Down

This file was deleted.

Expand Up @@ -24,7 +24,6 @@
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.threeten.bp.Duration;
Expand All @@ -50,6 +49,7 @@ public TimedAttemptSettings createNextAttempt(
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setOverallAttemptCount(prevSettings.getAttemptCount() + 1)
.build();
}
return null;
Expand Down Expand Up @@ -93,17 +93,17 @@ static Duration extractRetryDelay(@Nullable Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
if (!(throwable instanceof ApiException)) {
return null;
}
RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY);
if (retryInfo == null) {
ApiException exception = (ApiException) throwable;
if (exception.getErrorDetails() == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
if (exception.getErrorDetails().getRetryInfo() == null) {
return null;
}
RetryInfo retryInfo = exception.getErrorDetails().getRetryInfo();
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}