Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for BatchWriteAtLeastOnce #2520

Merged
merged 28 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7ff7cb2
feat: add support for BatchWriteAtleastOnce
rajatbhatta Jun 5, 2023
43c4f63
test: add batchwrite() support to MockSpannerServiceImpl
rajatbhatta Jun 5, 2023
abedd15
test: add commit timestamp to proto
rajatbhatta Jun 5, 2023
21d8f67
test: add commit timestamp to proto
rajatbhatta Jun 5, 2023
4858965
test: add commit timestamp to proto
rajatbhatta Jun 12, 2023
81743a7
consume the stream in tests
rajatbhatta Jun 12, 2023
65abc96
refactor tests
rajatbhatta Jun 12, 2023
8d76346
refactor tests
rajatbhatta Jun 14, 2023
a3569ae
test if mutations are correctly applied
rajatbhatta Jun 20, 2023
4cb0ada
null check
rajatbhatta Jun 20, 2023
5401b7d
skip for emulator
rajatbhatta Jun 20, 2023
5103b5c
Merge branch 'googleapis:main' into batch-write
rajatbhatta Jun 20, 2023
95c3326
add method documentation
rajatbhatta Jun 20, 2023
5678207
add method documentation
rajatbhatta Jun 20, 2023
347e9c3
add method documentation
rajatbhatta Jun 20, 2023
4c7251f
Merge branch 'googleapis:main' into batch-write
rajatbhatta Jul 6, 2023
1deaa29
remove autogenerated code
rajatbhatta Jul 6, 2023
acece36
remove autogenerated tests
rajatbhatta Jul 6, 2023
5f04154
batchWriteAtleastOnce -> batchWriteAtLeastOnce
rajatbhatta Jul 6, 2023
8e15c5c
batchWriteAtleastOnceWithOptions -> batchWriteAtLeastOnceWithOptions
rajatbhatta Jul 6, 2023
56f3929
changes based on updated batch write API
rajatbhatta Aug 17, 2023
356849e
add copyright and doc
rajatbhatta Aug 17, 2023
07f3bfb
Merge branch 'main' into batch-write-review
rajatbhatta Aug 17, 2023
1ca0d31
address review comments
rajatbhatta Aug 22, 2023
c3e3d7b
address review comments
rajatbhatta Aug 22, 2023
a036ced
add more documentation
rajatbhatta Aug 25, 2023
d8ef791
Merge branch 'googleapis:main' into batch-write-review
rajatbhatta Sep 18, 2023
7c4c73d
Merge branch 'googleapis:main' into batch-write-review
arpan14 Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package com.google.cloud.spanner;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -191,6 +193,55 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException;

/**
* Applies batch of mutation groups in a collection of efficient transactions. The mutation groups
* are applied non-atomically in an unspecified order and thus, they must be independent of each
* other. Partial failure is possible, i.e., some mutation groups may have been applied
* successfully, while some may have failed. The results of individual batches are streamed into
rajatbhatta marked this conversation as resolved.
Show resolved Hide resolved
* the response as and when the batches are applied.
*
* <p>Since this method does not feature replay protection, it may attempt to apply {@code
* mutation groups} more than once; if the mutation groups are not idempotent, this may lead to a
* failure being reported when the mutation group was applied once. For example, an insert may
rajatbhatta marked this conversation as resolved.
Show resolved Hide resolved
* fail with {@link ErrorCode#ALREADY_EXISTS} even though the row did not exist before this method
* was called. For this reason, most users of the library will prefer to use {@link
* #write(Iterable)} instead. However, {@code batchWriteAtLeastOnce()} method may be appropriate
* for non-atomically committing multiple mutation groups in a single RPC with low latency.
*
* <p>Example of BatchWriteAtLeastOnce
*
* <pre>{@code
* Iterable<MutationGroup> mutationGroups =
* ImmutableList.of(
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
* Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()),
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
* Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build()),
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build(),
* Mutation.newInsertBuilder("FOO5").set("ID").to(5L).set("NAME").to("Bar5").build()),
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO6").set("ID").to(6L).set("NAME").to("Bar6").build()));
* ServerStream<BatchWriteResponse> responses =
* dbClient.batchWriteAtLeastOnce(mutationGroups, Options.tag("batch-write-tag"));
* for (BatchWriteResponse response : responses) {
* // Do something when a response is received.
* }
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the batch write request.
* <li>{@link Options#tag(String)}: The transaction tag to use for the batch write request.
* </ul>
*/
ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... options) throws SpannerException;

/**
* Returns a context in which a single read can be performed using {@link TimestampBound#strong()}
* concurrency. This method will return a {@link ReadContext} that will not return the read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
Expand All @@ -24,6 +25,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
Expand Down Expand Up @@ -106,6 +108,21 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
} catch (RuntimeException e) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
TraceUtil.setWithFailure(span, e);
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
}

@Override
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.spanner.v1.BatchWriteRequest;
import java.util.ArrayList;
import java.util.List;

/** Represents a group of Cloud Spanner mutations to be committed together. */
public class MutationGroup {
private final ImmutableList<Mutation> mutations;

private MutationGroup(ImmutableList<Mutation> mutations) {
this.mutations = mutations;
}

/** Creates a {@code MutationGroup} given a vararg of mutations. */
public static MutationGroup of(Mutation... mutations) {
rajatbhatta marked this conversation as resolved.
Show resolved Hide resolved
rajatbhatta marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(mutations.length > 0, "Should pass in at least one mutation.");
return new MutationGroup(ImmutableList.copyOf(mutations));
}

/** Creates a {@code MutationGroup} given an iterable of mutations. */
public static MutationGroup of(Iterable<Mutation> mutations) {
return new MutationGroup(ImmutableList.copyOf(mutations));
}

/** Returns corresponding mutations for this MutationGroup. */
public ImmutableList<Mutation> getMutations() {
return mutations;
}

static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
olavloite marked this conversation as resolved.
Show resolved Hide resolved
}

static List<BatchWriteRequest.MutationGroup> toListProto(
final Iterable<MutationGroup> mutationGroups) {
List<BatchWriteRequest.MutationGroup> mutationGroupsProto = new ArrayList<>();
for (MutationGroup group : mutationGroups) {
mutationGroupsProto.add(toProto(group));
}
return mutationGroupsProto;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
Expand All @@ -35,6 +36,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -160,7 +163,6 @@ public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... transactionOptions)
throws SpannerException {
setActive(null);
Options commitRequestOptions = Options.fromTransactionOptions(transactionOptions);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
final CommitRequest.Builder requestBuilder =
Expand All @@ -172,15 +174,9 @@ public CommitResponse writeAtLeastOnceWithOptions(
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (commitRequestOptions.hasPriority() || commitRequestOptions.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (commitRequestOptions.hasPriority()) {
requestOptionsBuilder.setPriority(commitRequestOptions.priority());
}
if (commitRequestOptions.hasTag()) {
requestOptionsBuilder.setTransactionTag(commitRequestOptions.tag());
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);
if (commitRequestOptions != null) {
requestBuilder.setRequestOptions(commitRequestOptions);
}
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
try (Scope s = tracer.withSpan(span)) {
Expand All @@ -195,6 +191,45 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private RequestOptions getRequestOptions(TransactionOption... transactionOptions) {
Options requestOptions = Options.fromTransactionOptions(transactionOptions);
if (requestOptions.hasPriority() || requestOptions.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (requestOptions.hasPriority()) {
requestOptionsBuilder.setPriority(requestOptions.priority());
}
if (requestOptions.hasTag()) {
requestOptionsBuilder.setTransactionTag(requestOptions.tag());
}
return requestOptionsBuilder.build();
}
return null;
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... transactionOptions)
throws SpannerException {
setActive(null);
List<BatchWriteRequest.MutationGroup> mutationGroupsProto =
MutationGroup.toListProto(mutationGroups);
final BatchWriteRequest.Builder requestBuilder =
BatchWriteRequest.newBuilder().setSession(name).addAllMutationGroups(mutationGroupsProto);
RequestOptions batchWriteRequestOptions = getRequestOptions(transactionOptions);
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
Span span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE).startSpan();
try (Scope s = tracer.withSpan(span)) {
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
} catch (Throwable e) {
TraceUtil.setWithFailure(span, e);
throw SpannerExceptionFactory.newSpannerException(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
}

@Override
public ReadContext singleUse() {
return singleUse(TimestampBound.strong());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand All @@ -68,6 +69,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.ResultSetStats;
import io.opencensus.common.Scope;
import io.opencensus.metrics.DerivedLongCumulative;
Expand Down Expand Up @@ -1171,6 +1173,17 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
throws SpannerException {
try {
return get().batchWriteAtLeastOnce(mutationGroups, options);
} finally {
close();
}
}

@Override
public ReadContext singleUse() {
try {
Expand Down Expand Up @@ -1450,6 +1463,18 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
throws SpannerException {
try {
markUsed();
return delegate.batchWriteAtLeastOnce(mutationGroups, options);
} catch (SpannerException e) {
throw lastException = e;
}
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
throws SpannerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
static final String COMMIT = "CloudSpannerOperation.Commit";
static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery";
static final String READ = "CloudSpannerOperation.ExecuteStreamingRead";
static final String BATCH_WRITE = "CloudSpannerOperation.BatchWrite";

private static final Object CLIENT_ID_LOCK = new Object();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@
import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata;
import com.google.spanner.admin.instance.v1.UpdateInstanceRequest;
import com.google.spanner.v1.BatchCreateSessionsRequest;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -1684,6 +1686,14 @@ public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
BatchWriteRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getBatchWriteMethod());
return spannerStub.batchWriteCallable().call(request, context);
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ ApiFuture<ResultSet> executeQueryAsync(
ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);

ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
BatchWriteRequest request, @Nullable Map<Option, ?> options);

/**
* Executes a query with streaming result.
*
Expand Down