Skip to content

Commit

Permalink
feat: add support for BatchWriteAtLeastOnce (googleapis#2520)
Browse files Browse the repository at this point in the history
* feat: add support for BatchWriteAtleastOnce

* test: add batchwrite() support to MockSpannerServiceImpl

* test: add commit timestamp to proto

* test: add commit timestamp to proto

* test: add commit timestamp to proto

* consume the stream in tests

* refactor tests

* refactor tests

* test if mutations are correctly applied

* null check

* skip for emulator

* add method documentation

* add method documentation

* add method documentation

* remove autogenerated code

* remove autogenerated tests

* batchWriteAtleastOnce -> batchWriteAtLeastOnce

* batchWriteAtleastOnceWithOptions -> batchWriteAtLeastOnceWithOptions

* changes based on updated batch write API

* add copyright and doc

* address review comments

* address review comments

* add more documentation

---------

Co-authored-by: Arpan Mishra <akmish3@gmail.com>
  • Loading branch information
2 people authored and surbhigarg92 committed Oct 5, 2023
1 parent a1767b4 commit 27c064a
Show file tree
Hide file tree
Showing 12 changed files with 589 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package com.google.cloud.spanner;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -191,6 +193,56 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException;

/**
* Applies batch of mutation groups in a collection of efficient transactions. The mutation groups
* are applied non-atomically in an unspecified order and thus, they must be independent of each
* other. Partial failure is possible, i.e., some mutation groups may have been applied
* successfully, while some may have failed. The results of individual batches are streamed into
* the response as and when the batches are applied.
*
* <p>One BatchWriteResponse can contain the results for multiple MutationGroups. Inspect the
* indexes field to determine the MutationGroups that the BatchWriteResponse is for.
*
* <p>The mutation groups may be applied more than once. This can lead to failures if the mutation
* groups are non-idempotent. For example, an insert that is replayed can return an {@link
* ErrorCode#ALREADY_EXISTS} error. For this reason, users of the library may prefer to use {@link
* #write(Iterable)} instead. However, {@code batchWriteAtLeastOnce()} method may be appropriate
* for non-atomically committing multiple mutation groups in a single RPC with low latency.
*
* <p>Example of BatchWriteAtLeastOnce
*
* <pre>{@code
* Iterable<MutationGroup> mutationGroups =
* ImmutableList.of(
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
* Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()),
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
* Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build()),
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build(),
* Mutation.newInsertBuilder("FOO5").set("ID").to(5L).set("NAME").to("Bar5").build()),
* MutationGroup.of(
* Mutation.newInsertBuilder("FOO6").set("ID").to(6L).set("NAME").to("Bar6").build()));
* ServerStream<BatchWriteResponse> responses =
* dbClient.batchWriteAtLeastOnce(mutationGroups, Options.tag("batch-write-tag"));
* for (BatchWriteResponse response : responses) {
* // Do something when a response is received.
* }
* }</pre>
*
* Options for a transaction can include:
*
* <ul>
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
* RpcPriority} to use for the batch write request.
* <li>{@link Options#tag(String)}: The transaction tag to use for the batch write request.
* </ul>
*/
ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... options) throws SpannerException;

/**
* Returns a context in which a single read can be performed using {@link TimestampBound#strong()}
* concurrency. This method will return a {@link ReadContext} that will not return the read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
Expand All @@ -27,6 +28,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -105,6 +110,21 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
} finally {
span.end();
}
}

@Override
public ReadContext singleUse() {
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.spanner.v1.BatchWriteRequest;
import java.util.ArrayList;
import java.util.List;

/** Represents a group of Cloud Spanner mutations to be committed together. */
public class MutationGroup {
private final ImmutableList<Mutation> mutations;

private MutationGroup(ImmutableList<Mutation> mutations) {
this.mutations = mutations;
}

/** Creates a {@code MutationGroup} given a vararg of mutations. */
public static MutationGroup of(Mutation... mutations) {
Preconditions.checkArgument(mutations.length > 0, "Should pass in at least one mutation.");
return new MutationGroup(ImmutableList.copyOf(mutations));
}

/** Creates a {@code MutationGroup} given an iterable of mutations. */
public static MutationGroup of(Iterable<Mutation> mutations) {
return new MutationGroup(ImmutableList.copyOf(mutations));
}

/** Returns corresponding mutations for this MutationGroup. */
public ImmutableList<Mutation> getMutations() {
return mutations;
}

static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
}

static List<BatchWriteRequest.MutationGroup> toListProto(
final Iterable<MutationGroup> mutationGroups) {
List<BatchWriteRequest.MutationGroup> mutationGroupsProto = new ArrayList<>();
for (MutationGroup group : mutationGroups) {
mutationGroupsProto.add(toProto(group));
}
return mutationGroupsProto;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
Expand All @@ -38,6 +39,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -163,7 +166,6 @@ public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... transactionOptions)
throws SpannerException {
setActive(null);
Options commitRequestOptions = Options.fromTransactionOptions(transactionOptions);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
final CommitRequest.Builder requestBuilder =
Expand All @@ -175,15 +177,9 @@ public CommitResponse writeAtLeastOnceWithOptions(
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (commitRequestOptions.hasPriority() || commitRequestOptions.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (commitRequestOptions.hasPriority()) {
requestOptionsBuilder.setPriority(commitRequestOptions.priority());
}
if (commitRequestOptions.hasTag()) {
requestOptionsBuilder.setTransactionTag(commitRequestOptions.tag());
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);
if (commitRequestOptions != null) {
requestBuilder.setRequestOptions(commitRequestOptions);
}
CommitRequest request = requestBuilder.build();
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
Expand All @@ -198,6 +194,45 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private RequestOptions getRequestOptions(TransactionOption... transactionOptions) {
Options requestOptions = Options.fromTransactionOptions(transactionOptions);
if (requestOptions.hasPriority() || requestOptions.hasTag()) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (requestOptions.hasPriority()) {
requestOptionsBuilder.setPriority(requestOptions.priority());
}
if (requestOptions.hasTag()) {
requestOptionsBuilder.setTransactionTag(requestOptions.tag());
}
return requestOptionsBuilder.build();
}
return null;
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... transactionOptions)
throws SpannerException {
setActive(null);
List<BatchWriteRequest.MutationGroup> mutationGroupsProto =
MutationGroup.toListProto(mutationGroups);
final BatchWriteRequest.Builder requestBuilder =
BatchWriteRequest.newBuilder().setSession(name).addAllMutationGroups(mutationGroupsProto);
RequestOptions batchWriteRequestOptions = getRequestOptions(transactionOptions);
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
Span span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE).startSpan();
try (Scope s = tracer.withSpan(span)) {
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
} catch (Throwable e) {
TraceUtil.setWithFailure(span, e);
throw SpannerExceptionFactory.newSpannerException(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
}

@Override
public ReadContext singleUse() {
return singleUse(TimestampBound.strong());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand Down Expand Up @@ -82,6 +83,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.ResultSetStats;
import io.opencensus.metrics.DerivedLongCumulative;
import io.opencensus.metrics.DerivedLongGauge;
Expand Down Expand Up @@ -1185,6 +1187,17 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
throws SpannerException {
try {
return get().batchWriteAtLeastOnce(mutationGroups, options);
} finally {
close();
}
}

@Override
public ReadContext singleUse() {
try {
Expand Down Expand Up @@ -1478,6 +1491,18 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
throws SpannerException {
try {
markUsed();
return delegate.batchWriteAtLeastOnce(mutationGroups, options);
} catch (SpannerException e) {
throw lastException = e;
}
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
throws SpannerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
static final String COMMIT = "CloudSpannerOperation.Commit";
static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery";
static final String READ = "CloudSpannerOperation.ExecuteStreamingRead";
static final String BATCH_WRITE = "CloudSpannerOperation.BatchWrite";

private static final Object CLIENT_ID_LOCK = new Object();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@
import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata;
import com.google.spanner.admin.instance.v1.UpdateInstanceRequest;
import com.google.spanner.v1.BatchCreateSessionsRequest;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -1684,6 +1686,14 @@ public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
BatchWriteRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getBatchWriteMethod());
return spannerStub.batchWriteCallable().call(request, context);
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ ApiFuture<ResultSet> executeQueryAsync(
ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);

ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
BatchWriteRequest request, @Nullable Map<Option, ?> options);

/**
* Executes a query with streaming result.
*
Expand Down

0 comments on commit 27c064a

Please sign in to comment.