Skip to content

Commit

Permalink
fix: fix RetryInfo algorithm and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 28, 2023
1 parent f1b7fc7 commit 527cac4
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 21 deletions.
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.StatusCode;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowsRequest;
Expand Down Expand Up @@ -56,13 +57,30 @@ public Object getTransportCode() {
public MutateRowsException(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable);
boolean retryable,
@Nullable ErrorDetails errorDetails) {
super(
new Throwable("Some mutations failed to apply", rpcError),
LOCAL_STATUS,
retryable,
errorDetails);
Preconditions.checkNotNull(failedMutations);
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
this.failedMutations = failedMutations;
}

/**
* This constructor is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public MutateRowsException(
@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
this(rpcError, failedMutations, retryable, null);
}

/**
* Retrieve all of the failed mutations. This list will contain failures for all of the mutations
* that have failed across all of the retry attempts so far.
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
Expand Down Expand Up @@ -250,7 +251,12 @@ private void handleAttemptError(Throwable rpcError) {
currentRequest = builder.build();
originalIndexes = newOriginalIndexes;

throw new MutateRowsException(rpcError, allFailures.build(), entryError.isRetryable());
ErrorDetails errorDetails = null;
if (rpcError instanceof ApiException) {
errorDetails = ((ApiException) rpcError).getErrorDetails();
}
throw new MutateRowsException(
rpcError, allFailures.build(), entryError.isRetryable(), errorDetails);
}

/**
Expand All @@ -268,6 +274,8 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
List<Integer> newOriginalIndexes = Lists.newArrayList();
boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()];

ErrorDetails errorDetails = null;

for (MutateRowsResponse response : responses) {
for (Entry entry : response.getEntriesList()) {
seenIndices[Ints.checkedCast(entry.getIndex())] = true;
Expand All @@ -283,13 +291,15 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

allFailures.add(failedMutation);

if (!failedMutation.getError().isRetryable()) {
if (!ApiExceptions.isRetryable2(failedMutation.getError())
&& !failedMutation.getError().isRetryable()) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
// recording it's original index
newOriginalIndexes.add(origIndex);
builder.addEntries(lastRequest.getEntries((int) entry.getIndex()));
errorDetails = failedMutation.getError().getErrorDetails();
}
}
}
Expand Down Expand Up @@ -319,7 +329,7 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
throw new MutateRowsException(null, allFailures, isRetryable);
throw new MutateRowsException(null, allFailures, isRetryable, errorDetails);
}
}

Expand All @@ -338,10 +348,14 @@ private ApiException createEntryError(com.google.rpc.Status protoStatus) {

StatusCode gaxStatusCode = GrpcStatusCode.of(grpcStatus.getCode());

ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(protoStatus.getDetailsList()).build();

return ApiExceptionFactory.createException(
grpcStatus.asRuntimeException(),
gaxStatusCode,
retryableCodes.contains(gaxStatusCode.getCode()));
retryableCodes.contains(gaxStatusCode.getCode()),
errorDetails);
}

/**
Expand All @@ -354,10 +368,10 @@ private static ApiException createSyntheticErrorForRpcFailure(Throwable overallR
ApiException requestApiException = (ApiException) overallRequestError;

return ApiExceptionFactory.createException(
"Didn't receive a result for this mutation entry",
overallRequestError,
requestApiException.getStatusCode(),
requestApiException.isRetryable());
requestApiException.isRetryable(),
requestApiException.getErrorDetails());
}

return ApiExceptionFactory.createException(
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -93,17 +92,17 @@ static Duration extractRetryDelay(@Nullable Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
if (!(throwable instanceof ApiException)) {
return null;
}
RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY);
if (retryInfo == null) {
ApiException exception = (ApiException) throwable;
if (exception.getErrorDetails() == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
if (exception.getErrorDetails().getRetryInfo() == null) {
return null;
}
RetryInfo retryInfo = exception.getErrorDetails().getRetryInfo();
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
Expand Up @@ -15,13 +15,13 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm.RETRY_INFO_KEY;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.UnavailableException;
Expand Down Expand Up @@ -55,7 +55,9 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.protobuf.Any;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -77,6 +79,9 @@ public class RetryInfoTest {

@Rule public GrpcServerRule serverRule = new GrpcServerRule();

private static final Metadata.Key<byte[]> ERROR_DETAILS_KEY =
Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);

private FakeBigtableService service;
private BigtableDataClient client;
private BigtableDataSettings.Builder settings;
Expand Down Expand Up @@ -167,6 +172,42 @@ public void testMutateRowsNonRetryableErrorWithRetryInfo() {
false);
}

@Test
public void testMutateRowsPartialFailure() {
service.partial = true;

verifyRetryInfoIsUsed(
() ->
client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))),
true);
}

@Test
public void testMutateRowsPartialFailureNonRetryableError() {
service.partial = true;

verifyRetryInfoIsUsed(
() ->
client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))),
false);
}

// TODO: add this test back
// @Test
public void testMutateRowsPartialFailureCanBeDisabled() {
service.partial = true;

verifyRetryInfoCanBeDisabled(
() ->
client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))));
}

@Test
public void testMutateRowsDisableRetryInfo() throws IOException {
settings.stubSettings().setEnableRetryInfo(false);
Expand Down Expand Up @@ -366,27 +407,37 @@ private void verifyRetryInfoCanBeDisabled(Runnable runnable) {
private void enqueueRetryableExceptionWithDelay(com.google.protobuf.Duration delay) {
Metadata trailers = new Metadata();
RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
trailers.put(RETRY_INFO_KEY, retryInfo);
ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build();
byte[] status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray();
trailers.put(ERROR_DETAILS_KEY, status);

ApiException exception =
new UnavailableException(
new StatusRuntimeException(Status.UNAVAILABLE, trailers),
GrpcStatusCode.of(Status.Code.UNAVAILABLE),
true);
true,
errorDetails);

service.expectations.add(exception);
}

private ApiException enqueueNonRetryableExceptionWithDelay(com.google.protobuf.Duration delay) {
Metadata trailers = new Metadata();
RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
trailers.put(RETRY_INFO_KEY, retryInfo);
ErrorDetails errorDetails =
ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build();
byte[] status =
com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray();
trailers.put(ERROR_DETAILS_KEY, status);

ApiException exception =
new InternalException(
new StatusRuntimeException(Status.INTERNAL, trailers),
GrpcStatusCode.of(Status.Code.INTERNAL),
false);
false,
errorDetails);

service.expectations.add(exception);

Expand All @@ -395,6 +446,7 @@ private ApiException enqueueNonRetryableExceptionWithDelay(com.google.protobuf.D

private class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
Queue<Exception> expectations = Queues.newArrayDeque();
boolean partial = false;

@Override
public void readRows(
Expand Down Expand Up @@ -434,8 +486,26 @@ public void mutateRows(
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} else {
Exception expectedRpc = expectations.poll();
responseObserver.onError(expectedRpc);
if (partial) {
ApiException expectedRpc = (ApiException) expectations.poll();
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
builder.addEntries(
0,
MutateRowsResponse.Entry.newBuilder()
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(expectedRpc.getStatusCode().getCode().getHttpStatusCode())
.addDetails(Any.pack(expectedRpc.getErrorDetails().getRetryInfo())))
.build());
for (int i = 1; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} else {
Exception expectedRpc = expectations.poll();
responseObserver.onError(expectedRpc);
}
}
}

Expand Down

0 comments on commit 527cac4

Please sign in to comment.