From e4db74534690eb090347c00047eecbd45f5a22b7 Mon Sep 17 00:00:00 2001 From: kolea2 <45548808+kolea2@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:52:13 -0400 Subject: [PATCH] fix: check that all bulk mutation entries are accounted for (#1907) (#1923) port of #1907 --- .../mutaterows/MutateRowsAttemptCallable.java | 24 ++++++++++++++ .../metrics/BigtableTracerCallableTest.java | 6 +++- .../metrics/BuiltinMetricsTracerTest.java | 6 +++- .../v2/stub/metrics/MetricsTracerTest.java | 7 +++- .../metrics/StatsHeadersCallableTest.java | 6 +++- .../MutateRowsAttemptCallableTest.java | 33 +++++++++++++++++++ .../stub/mutaterows/MutateRowsRetryTest.java | 6 +++- 7 files changed, 83 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index 36c2930bda..b049219a95 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -35,6 +35,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import com.google.rpc.Code; import java.util.List; @@ -263,9 +264,12 @@ private void handleAttemptSuccess(List responses) { Builder builder = lastRequest.toBuilder().clearEntries(); List newOriginalIndexes = Lists.newArrayList(); + boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()]; for (MutateRowsResponse response : responses) { for (Entry entry : response.getEntriesList()) { + seenIndices[Ints.checkedCast(entry.getIndex())] = true; + if (entry.getStatus().getCode() == Code.OK_VALUE) { continue; } @@ -288,6 +292,26 @@ private void handleAttemptSuccess(List responses) { } } + // Handle missing mutations + for (int i = 0; i < seenIndices.length; i++) { + if (seenIndices[i]) { + continue; + } + + int origIndex = getOriginalIndex(i); + FailedMutation failedMutation = + FailedMutation.create( + origIndex, + ApiExceptionFactory.createException( + "Missing entry response for entry " + origIndex, + null, + GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL), + false)); + + allFailures.add(failedMutation); + permanentFailures.add(failedMutation); + } + currentRequest = builder.build(); originalIndexes = newOriginalIndexes; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java index 1b833f5c06..5896eb38de 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java @@ -404,7 +404,11 @@ public void mutateRow(MutateRowRequest request, StreamObserver observer) { - observer.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i)); + } + observer.onNext(builder.build()); observer.onCompleted(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index 8a371cb2e7..f0fba2164a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -639,7 +639,11 @@ public void mutateRows( Thread.sleep(SERVER_LATENCY); } catch (InterruptedException e) { } - responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index bb5e89aab4..6bd0628554 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -445,10 +445,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable { new Answer() { @Override public Object answer(InvocationOnMock invocation) { + MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0]; @SuppressWarnings("unchecked") StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; - observer.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + observer.onNext(builder.build()); observer.onCompleted(); return null; } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java index 538d4fc246..88a874b8c9 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java @@ -223,7 +223,11 @@ public void mutateRows(MutateRowsRequest request, StreamObserver parentFuture.attemptFuture.get()); + assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class); + MutateRowsException e = (MutateRowsException) executionException.getCause(); + + assertThat(e).hasMessageThat().contains("Some mutations failed to apply"); + assertThat(e.getFailedMutations()).hasSize(1); + FailedMutation failedMutation = e.getFailedMutations().get(0); + assertThat(failedMutation.getIndex()).isEqualTo(1); + assertThat(failedMutation.getError()) + .hasMessageThat() + .contains("Missing entry response for entry 1"); + } + @Test public void testNoRpcTimeout() { parentFuture.timedAttemptSettings = diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java index 5d15dd5219..86a94d34ea 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java @@ -107,7 +107,11 @@ public void mutateRows( MutateRowsRequest request, StreamObserver responseObserver) { attemptCounter.incrementAndGet(); if (expectations.isEmpty()) { - responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } else { Exception expectedRpc = expectations.poll();