Skip to content

Commit

Permalink
fix: fix RetryInfo algorithm and tests (#2041)
Browse files Browse the repository at this point in the history
Gax already parses ErrorDetails from an error response and add the error details to ApiException. Fix the RetryInfoAlgorithm to handle this correctly and the test to send error responses with the correct format.

Also fixed MutateRowsAttemptCallable to not use RetryInfoAlgorithm with the setting is disabled. 

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)
- [ ] Rollback plan is reviewed and LGTMed

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf committed Jan 8, 2024
1 parent 2907038 commit dad7517
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 75 deletions.
18 changes: 18 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -156,4 +156,22 @@
<className>com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm</className>
<field>*</field>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable</className>
<method>*</method>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/MutateRowsException</className>
<method>*</method>
</difference>
<!-- InternalApi was updated -->
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/MutateRowsException</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.StatusCode;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowsRequest;
Expand Down Expand Up @@ -53,16 +54,36 @@ public Object getTransportCode() {
* applications.
*/
@InternalApi
public MutateRowsException(
public static MutateRowsException create(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable);
ErrorDetails errorDetails = null;
if (rpcError instanceof ApiException) {
errorDetails = ((ApiException) rpcError).getErrorDetails();
}

return new MutateRowsException(rpcError, failedMutations, retryable, errorDetails);
}

private MutateRowsException(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable,
@Nullable ErrorDetails errorDetails) {
super(rpcError, LOCAL_STATUS, retryable, errorDetails);
Preconditions.checkNotNull(failedMutations);
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
this.failedMutations = failedMutations;
}

// TODO: remove this after we add a ctor in gax to pass in a Throwable, a message and error
// details.
@Override
public String getMessage() {
return "Some mutations failed to apply";
}

/**
* Retrieve all of the failed mutations. This list will contain failures for all of the mutations
* that have failed across all of the retry attempts so far.
Expand Down
Expand Up @@ -784,7 +784,8 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);
}

/**
Expand Down
Expand Up @@ -19,7 +19,9 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
Expand All @@ -31,7 +33,6 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.gaxx.retrying.ApiExceptions;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -111,6 +112,8 @@ public Object getTransportCode() {
@Nullable private List<Integer> originalIndexes;
@Nonnull private final Set<StatusCode.Code> retryableCodes;
@Nullable private final List<FailedMutation> permanentFailures;
@Nonnull private final RetryAlgorithm<MutateRowsRequest> retryAlgorithm;
@Nonnull private TimedAttemptSettings attemptSettings;

// Parent controller
private RetryingFuture<Void> externalFuture;
Expand Down Expand Up @@ -138,11 +141,14 @@ public List<MutateRowsResponse> apply(Throwable throwable) {
@Nonnull UnaryCallable<MutateRowsRequest, List<MutateRowsResponse>> innerCallable,
@Nonnull MutateRowsRequest originalRequest,
@Nonnull ApiCallContext callContext,
@Nonnull Set<StatusCode.Code> retryableCodes) {
@Nonnull Set<StatusCode.Code> retryableCodes,
@Nonnull RetryAlgorithm<MutateRowsRequest> retryAlgorithm) {
this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable");
this.currentRequest = Preconditions.checkNotNull(originalRequest, "currentRequest");
this.callContext = Preconditions.checkNotNull(callContext, "callContext");
this.retryableCodes = Preconditions.checkNotNull(retryableCodes, "retryableCodes");
this.retryAlgorithm = retryAlgorithm;
this.attemptSettings = retryAlgorithm.createFirstAttempt();

permanentFailures = Lists.newArrayList();
}
Expand Down Expand Up @@ -230,14 +236,15 @@ private void handleAttemptError(Throwable rpcError) {
Builder builder = lastRequest.toBuilder().clearEntries();
List<Integer> newOriginalIndexes = Lists.newArrayList();

attemptSettings = retryAlgorithm.createNextAttempt(null, entryError, null, attemptSettings);

for (int i = 0; i < currentRequest.getEntriesCount(); i++) {
int origIndex = getOriginalIndex(i);

FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (!ApiExceptions.isRetryable2(failedMutation.getError())
&& !failedMutation.getError().isRetryable()) {
if (!retryAlgorithm.shouldRetry(null, failedMutation.getError(), null, attemptSettings)) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
Expand All @@ -250,15 +257,15 @@ private void handleAttemptError(Throwable rpcError) {
currentRequest = builder.build();
originalIndexes = newOriginalIndexes;

throw new MutateRowsException(rpcError, allFailures.build(), entryError.isRetryable());
throw MutateRowsException.create(rpcError, allFailures.build(), builder.getEntriesCount() > 0);
}

/**
* Handle entry level failures. All new response entries are inspected for failure. If any
* transient failures are found, their corresponding mutations are scheduled for the next RPC. The
* caller is notified of both new found errors and pre-existing permanent errors in the thrown
* {@link MutateRowsException}. If no errors exist, then the attempt future is successfully
* completed.
* completed. We don't currently handle RetryInfo on entry level failures.
*/
private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
List<FailedMutation> allFailures = Lists.newArrayList(permanentFailures);
Expand Down Expand Up @@ -319,7 +326,7 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
throw new MutateRowsException(null, allFailures, isRetryable);
throw MutateRowsException.create(null, allFailures, isRetryable);
}
}

Expand Down Expand Up @@ -354,10 +361,10 @@ private static ApiException createSyntheticErrorForRpcFailure(Throwable overallR
ApiException requestApiException = (ApiException) overallRequestError;

return ApiExceptionFactory.createException(
"Didn't receive a result for this mutation entry",
overallRequestError,
requestApiException.getStatusCode(),
requestApiException.isRetryable());
requestApiException.isRetryable(),
requestApiException.getErrorDetails());
}

return ApiExceptionFactory.createException(
Expand Down
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
Expand Down Expand Up @@ -44,23 +45,26 @@ public class MutateRowsRetryingCallable extends UnaryCallable<MutateRowsRequest,
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutorWithContext<Void> executor;
private final ImmutableSet<Code> retryCodes;
private final RetryAlgorithm retryAlgorithm;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutorWithContext<Void> executor,
@Nonnull Set<StatusCode.Code> retryCodes) {
@Nonnull Set<StatusCode.Code> retryCodes,
@Nonnull RetryAlgorithm retryAlgorithm) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
this.callable = Preconditions.checkNotNull(callable);
this.executor = Preconditions.checkNotNull(executor);
this.retryCodes = ImmutableSet.copyOf(retryCodes);
this.retryAlgorithm = retryAlgorithm;
}

@Override
public RetryingFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext inputContext) {
ApiCallContext context = callContextPrototype.nullToSelf(inputContext);
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes);
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes, retryAlgorithm);

RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
Expand Down

This file was deleted.

Expand Up @@ -20,12 +20,8 @@
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.threeten.bp.Duration;

Expand All @@ -37,10 +33,6 @@
@InternalApi
public class RetryInfoRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

@VisibleForTesting
public static final Metadata.Key<RetryInfo> RETRY_INFO_KEY =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
Expand All @@ -50,6 +42,7 @@ public TimedAttemptSettings createNextAttempt(
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setOverallAttemptCount(prevSettings.getAttemptCount() + 1)
.build();
}
return null;
Expand Down Expand Up @@ -93,17 +86,17 @@ static Duration extractRetryDelay(@Nullable Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
if (!(throwable instanceof ApiException)) {
return null;
}
RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY);
if (retryInfo == null) {
ApiException exception = (ApiException) throwable;
if (exception.getErrorDetails() == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
if (exception.getErrorDetails().getRetryInfo() == null) {
return null;
}
RetryInfo retryInfo = exception.getErrorDetails().getRetryInfo();
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
Expand Up @@ -15,13 +15,13 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm.RETRY_INFO_KEY;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.UnavailableException;
Expand Down Expand Up @@ -55,7 +55,9 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.protobuf.Any;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -77,13 +79,16 @@ public class RetryInfoTest {

@Rule public GrpcServerRule serverRule = new GrpcServerRule();

private static final Metadata.Key<byte[]> ERROR_DETAILS_KEY =
Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);

private FakeBigtableService service;
private BigtableDataClient client;
private BigtableDataSettings.Builder settings;

private AtomicInteger attemptCounter = new AtomicInteger();
private com.google.protobuf.Duration delay =
com.google.protobuf.Duration.newBuilder().setSeconds(1).setNanos(0).build();
com.google.protobuf.Duration.newBuilder().setSeconds(2).setNanos(0).build();

@Before
public void setUp() throws IOException {
Expand Down Expand Up @@ -366,27 +371,37 @@ private void verifyRetryInfoCanBeDisabled(Runnable runnable) {
private void enqueueRetryableExceptionWithDelay(com.google.protobuf.Duration delay) {
Metadata trailers = new Metadata();
RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
trailers.put(RETRY_INFO_KEY, retryInfo);
ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build();
byte[] status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray();
trailers.put(ERROR_DETAILS_KEY, status);

ApiException exception =
new UnavailableException(
new StatusRuntimeException(Status.UNAVAILABLE, trailers),
GrpcStatusCode.of(Status.Code.UNAVAILABLE),
true);
true,
errorDetails);

service.expectations.add(exception);
}

private ApiException enqueueNonRetryableExceptionWithDelay(com.google.protobuf.Duration delay) {
Metadata trailers = new Metadata();
RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
trailers.put(RETRY_INFO_KEY, retryInfo);
ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build();
byte[] status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray();
trailers.put(ERROR_DETAILS_KEY, status);

ApiException exception =
new InternalException(
new StatusRuntimeException(Status.INTERNAL, trailers),
GrpcStatusCode.of(Status.Code.INTERNAL),
false);
false,
errorDetails);

service.expectations.add(exception);

Expand Down

0 comments on commit dad7517

Please sign in to comment.