Skip to content

Commit

Permalink
fix: fix batch mutation limit (#1808)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf committed Jul 11, 2023
1 parent cb160af commit ed24d4f
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 1 deletion.
Expand Up @@ -15,6 +15,8 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.cloud.bigtable.data.v2.models.RowMutationEntry.MAX_MUTATION;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.MutateRowsRequest;
Expand All @@ -40,6 +42,8 @@ public final class BulkMutation implements Serializable, Cloneable {
private final String tableId;
private transient MutateRowsRequest.Builder builder;

private long mutationCountSum = 0;

public static BulkMutation create(String tableId) {
return new BulkMutation(tableId);
}
Expand Down Expand Up @@ -81,6 +85,14 @@ public BulkMutation add(@Nonnull ByteString rowKey, @Nonnull Mutation mutation)
Preconditions.checkNotNull(rowKey);
Preconditions.checkNotNull(mutation);

long mutationCount = mutation.getMutations().size();
Preconditions.checkArgument(
mutationCountSum + mutationCount <= MAX_MUTATION,
String.format(
"Too many mutations, got %s, limit is %s",
mutationCountSum + mutationCount, MAX_MUTATION));
this.mutationCountSum += mutationCount;

builder.addEntries(
MutateRowsRequest.Entry.newBuilder()
.setRowKey(rowKey)
Expand Down
Expand Up @@ -33,6 +33,8 @@
public class RowMutationEntry implements MutationApi<RowMutationEntry>, Serializable {
private static final long serialVersionUID = 1974738836742298192L;

static final int MAX_MUTATION = 100000;

private final ByteString key;
private final Mutation mutation;

Expand Down Expand Up @@ -180,6 +182,11 @@ public RowMutationEntry deleteRow() {

@InternalApi
public MutateRowsRequest.Entry toProto() {
Preconditions.checkArgument(
mutation.getMutations().size() <= MAX_MUTATION,
String.format(
"Too many mutations, got %s, limit is %s",
mutation.getMutations().size(), MAX_MUTATION));
return MutateRowsRequest.Entry.newBuilder()
.setRowKey(key)
.addAllMutations(mutation.getMutations())
Expand Down
@@ -0,0 +1,60 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.gax.batching.BatchResource;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;

/**
* A custom implementation of {@link BatchResource} because MutateRowsRequest has a limit on number
* of mutations.
*/
@AutoValue
abstract class MutateRowsBatchResource implements BatchResource {

static MutateRowsBatchResource create(long elementCount, long byteCount, long mutationCount) {
return new AutoValue_MutateRowsBatchResource(elementCount, byteCount, mutationCount);
}

@Override
public BatchResource add(BatchResource batchResource) {
Preconditions.checkArgument(
batchResource instanceof MutateRowsBatchResource,
"Expected MutateRowsBatchResource, got " + batchResource.getClass());
MutateRowsBatchResource mutateRowsResource = (MutateRowsBatchResource) batchResource;

return new AutoValue_MutateRowsBatchResource(
getElementCount() + mutateRowsResource.getElementCount(),
getByteCount() + mutateRowsResource.getByteCount(),
getMutationCount() + mutateRowsResource.getMutationCount());
}

@Override
public abstract long getElementCount();

@Override
public abstract long getByteCount();

abstract long getMutationCount();

@Override
public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) {
return getElementCount() > maxElementThreshold
|| getByteCount() > maxBytesThreshold
|| getMutationCount() > 100000;
}
}
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.batching.BatchEntry;
import com.google.api.gax.batching.BatchResource;
import com.google.api.gax.batching.BatchingDescriptor;
import com.google.api.gax.batching.BatchingRequestBuilder;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
Expand Down Expand Up @@ -90,6 +91,17 @@ public long countBytes(RowMutationEntry entry) {
return entry.toProto().getSerializedSize();
}

@Override
public BatchResource createResource(RowMutationEntry element) {
long byteCount = countBytes(element);
return MutateRowsBatchResource.create(1, byteCount, element.toProto().getMutationsCount());
}

@Override
public BatchResource createEmptyResource() {
return MutateRowsBatchResource.create(0, 0, 0);
}

/**
* A {@link BatchingRequestBuilder} that will spool mutations and send them out as a {@link
* BulkMutation}.
Expand Down
Expand Up @@ -16,15 +16,18 @@
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlEventStats;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import java.io.IOException;
import java.util.Objects;
Expand All @@ -33,6 +36,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class BulkMutateIT {
Expand Down Expand Up @@ -83,4 +87,52 @@ public void test() throws IOException, InterruptedException {
assertThat(row.getCells()).hasSize(1);
}
}

@Test
public void testManyMutations() throws IOException, InterruptedException {
// Emulator is very slow and will take a long time for the test to run
assume()
.withMessage("testManyMutations is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);

BigtableDataSettings settings = testEnvRule.env().getDataClientSettings();
String rowPrefix = UUID.randomUUID().toString();

BatchingSettings batchingSettings =
settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings();

settings
.toBuilder()
.stubSettings()
.bulkMutateRowsSettings()
.setBatchingSettings(
batchingSettings.toBuilder().setDelayThreshold(Duration.ofHours(1)).build());
try (BigtableDataClient client = BigtableDataClient.create(settings);
BatcherImpl<RowMutationEntry, Void, BulkMutation, Void> batcher =
(BatcherImpl<RowMutationEntry, Void, BulkMutation, Void>)
client.newBulkMutationBatcher(testEnvRule.env().getTableId())) {

String familyId = testEnvRule.env().getFamilyId();
for (int i = 0; i < 2; i++) {
String key = rowPrefix + "test-key";
RowMutationEntry rowMutationEntry = RowMutationEntry.create(key);
// Create mutation entries with many columns. The batcher should flush every time.
for (long j = 0; j < 50001; j++) {
rowMutationEntry.setCell(familyId, "q" + j + i, j);
}
batcher.add(rowMutationEntry);
}
batcher.flush();
// Query a key to make sure the write succeeded
Row row =
testEnvRule
.env()
.getDataClient()
.readRowsCallable()
.first()
.call(Query.create(testEnvRule.env().getTableId()).rowKey(rowPrefix + "test-key"));
assertThat(row.getCells()).hasSize(100002);
}
}
}
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -172,4 +173,31 @@ public void fromProtoTest() {
.matches(NameUtil.formatTableName(projectId, instanceId, TABLE_ID));
assertThat(overriddenRequest.getAppProfileId()).matches(appProfile);
}

@Test
public void testManyMutations() {
BulkMutation bulkMutation = BulkMutation.create(TABLE_ID);

try {
for (int i = 0; i < 3; i++) {
String key = "key" + i;
Mutation mutation = Mutation.create();
for (int j = 0; j < 50000; j++) {
mutation.setCell("f", "q" + j, "value");
}
bulkMutation.add(key, mutation);
}
Assert.fail("Test should fail with IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage()).contains("Too many mutations");
}

// we should be able to add 10000 mutations
bulkMutation = BulkMutation.create(TABLE_ID);
Mutation mutation = Mutation.create();
for (int i = 0; i < 100000; i++) {
mutation.setCell("f", "q" + i, "value");
}
bulkMutation.add("key", mutation);
}
}
Expand Up @@ -439,7 +439,6 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
.reserve(any(Long.class), any(Long.class));
when(flowController.getMaxElementCountLimit()).thenReturn(null);
when(flowController.getMaxRequestBytesLimit()).thenReturn(null);
when(batchingDescriptor.countBytes(any())).thenReturn(1l);
when(batchingDescriptor.newRequestBuilder(any())).thenCallRealMethod();

doAnswer(
Expand Down
Expand Up @@ -19,6 +19,7 @@

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchEntry;
import com.google.api.gax.batching.BatchResource;
import com.google.api.gax.batching.BatchingRequestBuilder;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.DeadlineExceededException;
Expand Down Expand Up @@ -180,4 +181,23 @@ public void splitExceptionWithFailedMutationsTest() {
.hasCauseThat()
.isEqualTo(serverError.getFailedMutations().get(1).getError());
}

@Test
public void shouldFlushTest() {
MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
RowMutationEntry entryWithManyMutations = RowMutationEntry.create("key1");
for (int i = 0; i < 100000; i++) {
entryWithManyMutations.setCell("f", "q", "v" + i);
}
RowMutationEntry entryWithSingleEntry = RowMutationEntry.create("key1").setCell("f", "q", "v");
BatchResource resourceWithManyMutations = underTest.createResource(entryWithManyMutations);
BatchResource resourceWithSingleMutation = underTest.createResource(entryWithSingleEntry);

assertThat(resourceWithManyMutations.shouldFlush(1, 20 * 1000 * 1000)).isFalse();
assertThat(
resourceWithManyMutations
.add(resourceWithSingleMutation)
.shouldFlush(3, 20 * 1000 * 1000))
.isTrue();
}
}

0 comments on commit ed24d4f

Please sign in to comment.