Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check that all bulk mutation entries are accounted for (#1907) #1923

Merged
merged 2 commits into from Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,6 +35,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.rpc.Code;
import java.util.List;
Expand Down Expand Up @@ -263,9 +264,12 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

Builder builder = lastRequest.toBuilder().clearEntries();
List<Integer> newOriginalIndexes = Lists.newArrayList();
boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()];

for (MutateRowsResponse response : responses) {
for (Entry entry : response.getEntriesList()) {
seenIndices[Ints.checkedCast(entry.getIndex())] = true;

if (entry.getStatus().getCode() == Code.OK_VALUE) {
continue;
}
Expand All @@ -288,6 +292,26 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
}
}

// Handle missing mutations
for (int i = 0; i < seenIndices.length; i++) {
if (seenIndices[i]) {
continue;
}

int origIndex = getOriginalIndex(i);
FailedMutation failedMutation =
FailedMutation.create(
origIndex,
ApiExceptionFactory.createException(
"Missing entry response for entry " + origIndex,
null,
GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL),
false));

allFailures.add(failedMutation);
permanentFailures.add(failedMutation);
}

currentRequest = builder.build();
originalIndexes = newOriginalIndexes;

Expand Down
Expand Up @@ -404,7 +404,11 @@ public void mutateRow(MutateRowRequest request, StreamObserver<MutateRowResponse

@Override
public void mutateRows(MutateRowsRequest request, StreamObserver<MutateRowsResponse> observer) {
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i));
}
observer.onNext(builder.build());
observer.onCompleted();
}

Expand Down
Expand Up @@ -639,7 +639,11 @@ public void mutateRows(
Thread.sleep(SERVER_LATENCY);
} catch (InterruptedException e) {
}
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

Expand Down
Expand Up @@ -445,10 +445,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
StreamObserver<MutateRowsResponse> observer =
(StreamObserver<MutateRowsResponse>) invocation.getArguments()[1];
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
observer.onNext(builder.build());
observer.onCompleted();
return null;
}
Expand Down
Expand Up @@ -223,7 +223,11 @@ public void mutateRows(MutateRowsRequest request, StreamObserver<MutateRowsRespo
observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
return;
}
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
observer.onNext(builder.build());
observer.onCompleted();
}

Expand Down
Expand Up @@ -41,6 +41,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -92,6 +94,37 @@ public void singleEntrySuccessTest() throws Exception {
assertThat(innerCallable.lastRequest).isEqualTo(request);
}

@Test
public void missingEntry() {
MutateRowsRequest request =
MutateRowsRequest.newBuilder()
.addEntries(Entry.getDefaultInstance())
.addEntries(Entry.getDefaultInstance())
.build();
innerCallable.response.add(
MutateRowsResponse.newBuilder()
.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(0))
.build());

MutateRowsAttemptCallable attemptCallable =
new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes);
attemptCallable.setExternalFuture(parentFuture);
attemptCallable.call();

ExecutionException executionException =
Assert.assertThrows(ExecutionException.class, () -> parentFuture.attemptFuture.get());
assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class);
MutateRowsException e = (MutateRowsException) executionException.getCause();

assertThat(e).hasMessageThat().contains("Some mutations failed to apply");
assertThat(e.getFailedMutations()).hasSize(1);
FailedMutation failedMutation = e.getFailedMutations().get(0);
assertThat(failedMutation.getIndex()).isEqualTo(1);
assertThat(failedMutation.getError())
.hasMessageThat()
.contains("Missing entry response for entry 1");
}

@Test
public void testNoRpcTimeout() {
parentFuture.timedAttemptSettings =
Expand Down
Expand Up @@ -107,7 +107,11 @@ public void mutateRows(
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
attemptCounter.incrementAndGet();
if (expectations.isEmpty()) {
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} else {
Exception expectedRpc = expectations.poll();
Expand Down