Skip to content

Commit

Permalink
SpannerIO: support max commit delay
Browse files Browse the repository at this point in the history
  • Loading branch information
kberezin-nshl committed Apr 22, 2024
1 parent 8dcbf96 commit 74f014e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public abstract class SpannerConfig implements Serializable {

public abstract @Nullable ValueProvider<RpcPriority> getRpcPriority();

public abstract @Nullable ValueProvider<Duration> getMaxCommitDelay();

public abstract @Nullable ValueProvider<String> getDatabaseRole();

public abstract @Nullable ValueProvider<Duration> getPartitionQueryTimeout();
Expand Down Expand Up @@ -156,6 +158,8 @@ abstract Builder setExecuteStreamingSqlRetrySettings(

abstract Builder setRpcPriority(ValueProvider<RpcPriority> rpcPriority);

abstract Builder setMaxCommitDelay(ValueProvider<Duration> maxCommitDelay);

abstract Builder setDatabaseRole(ValueProvider<String> databaseRole);

abstract Builder setDataBoostEnabled(ValueProvider<Boolean> dataBoostEnabled);
Expand Down Expand Up @@ -278,6 +282,22 @@ public SpannerConfig withRpcPriority(ValueProvider<RpcPriority> rpcPriority) {
return toBuilder().setRpcPriority(rpcPriority).build();
}

/* Specifies the max commit delay for high throughput writes. */
public SpannerConfig withMaxCommitDelay(long millis) {
return withMaxCommitDelay(Duration.millis(millis));
}

/** Specifies the max commit delay for high throughput writes. */
public SpannerConfig withMaxCommitDelay(Duration maxCommitDelay) {
return withMaxCommitDelay(ValueProvider.StaticValueProvider.of(maxCommitDelay));
}

/** Specifies the max commit delay for high throughput writes. */
public SpannerConfig withMaxCommitDelay(ValueProvider<Duration> maxCommitDelay) {
checkNotNull(maxCommitDelay, "withMaxCommitTimeout(maxCommitDelay) called with null input.");
return toBuilder().setMaxCommitDelay(maxCommitDelay).build();
}

/** Specifies the Cloud Spanner database role. */
public SpannerConfig withDatabaseRole(ValueProvider<String> databaseRole) {
return toBuilder().setDatabaseRole(databaseRole).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
Expand Down Expand Up @@ -1186,6 +1188,18 @@ public Write withCommitDeadline(Duration commitDeadline) {
return withSpannerConfig(config.withCommitDeadline(commitDeadline));
}

/**
* Specifies max commit delay for the Commit API call for throughput optimized writes. If not
* set, Spanner might set a small delay if it thinks that will amortize the cost of the writes.
* For more information about the feature, <a
* href="https://cloud.google.com/spanner/docs/throughput-optimized-writes#default-behavior">see
* documentation</a>
*/
public Write withMaxCommitDelay(long millis) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withMaxCommitDelay(millis));
}

/**
* Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors.
* Default is 15 mins.
Expand Down Expand Up @@ -2227,15 +2241,9 @@ public void processElement(ProcessContext c) throws Exception {
private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws SpannerException {
for (int retry = 1; ; retry++) {
try {
if (spannerConfig.getRpcPriority() != null
&& spannerConfig.getRpcPriority().get() != null) {
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(
batch, Options.priority(spannerConfig.getRpcPriority().get()));
} else {
spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch);
}
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(batch, getTransactionOptions());
reportServiceCallMetricsForBatch(batch, "ok");
return;
} catch (AbortedException e) {
Expand All @@ -2256,6 +2264,21 @@ private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws Sp
}
}

private Options.TransactionOption[] getTransactionOptions() {
return Stream.of(
spannerConfig.getRpcPriority() != null && spannerConfig.getRpcPriority().get() != null
? Options.priority(spannerConfig.getRpcPriority().get())
: null,
spannerConfig.getMaxCommitDelay() != null
&& spannerConfig.getMaxCommitDelay().get() != null
? Options.maxCommitDelay(
java.time.Duration.ofMillis(
spannerConfig.getMaxCommitDelay().get().getMillis()))
: null)
.filter(Objects::nonNull)
.toArray(Options.TransactionOption[]::new);
}

private void reportServiceCallMetricsForBatch(List<Mutation> batch, String statusCode) {
// Get names of all tables in batch of mutations.
Set<String> tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner;

import java.time.Duration;

/** Provides access to some package-private methods of Spanner options. */
public class OptionsImposter {
public static Options fromTransactionOptions(Options.TransactionOption... options) {
return Options.fromTransactionOptions(options);
}

public static Duration maxCommitDelay(Options options) {
return options.maxCommitDelay();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import com.google.cloud.spanner.KeyRange;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.ReadQueryUpdateTransactionOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.OptionsImposter;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.SpannerExceptionFactory;
Expand Down Expand Up @@ -279,6 +281,32 @@ public void emptyDatabaseId() throws Exception {
write.expand(null);
}

@Test
public void runBatchQueryTestWithMaxCommitDelay() {
SpannerIO.Write write =
SpannerIO.write()
.withSpannerConfig(SPANNER_CONFIG)
.withServiceFactory(serviceFactory)
.withMaxCommitDelay(100L);
assertEquals(100L, write.getSpannerConfig().getMaxCommitDelay().get().getMillis());

Mutation mutation = buildUpsertMutation(2L);
PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation));
mutations.apply(write);
pipeline.run();

verify(serviceFactory.mockDatabaseClient(), times(1))
.writeAtLeastOnceWithOptions(
mutationsInNoOrder(buildMutationBatch(mutation)),
any(ReadQueryUpdateTransactionOption.class),
argThat(
opts -> {
Options options = OptionsImposter.fromTransactionOptions(opts);
return java.time.Duration.ofMillis(100L)
.equals(OptionsImposter.maxCommitDelay(options));
}));
}

@Test
public void singleMutationPipeline() throws Exception {
Mutation mutation = buildUpsertMutation(2L);
Expand Down

0 comments on commit 74f014e

Please sign in to comment.