Skip to content

Commit

Permalink
SpannerIO: support max commit delay
Browse files Browse the repository at this point in the history
  • Loading branch information
kberezin-nshl committed Apr 17, 2024
1 parent fb6bfc3 commit d0efd2e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public abstract class SpannerConfig implements Serializable {

public abstract @Nullable ValueProvider<RpcPriority> getRpcPriority();

public abstract @Nullable ValueProvider<Duration> getMaxCommitDelay();

public abstract @Nullable ValueProvider<String> getDatabaseRole();

public abstract @Nullable ValueProvider<Duration> getPartitionQueryTimeout();
Expand Down Expand Up @@ -156,6 +158,8 @@ abstract Builder setExecuteStreamingSqlRetrySettings(

abstract Builder setRpcPriority(ValueProvider<RpcPriority> rpcPriority);

abstract Builder setMaxCommitDelay(ValueProvider<Duration> maxCommitDelay);

abstract Builder setDatabaseRole(ValueProvider<String> databaseRole);

abstract Builder setDataBoostEnabled(ValueProvider<Boolean> dataBoostEnabled);
Expand Down Expand Up @@ -278,6 +282,22 @@ public SpannerConfig withRpcPriority(ValueProvider<RpcPriority> rpcPriority) {
return toBuilder().setRpcPriority(rpcPriority).build();
}

/* Specifies the max commit delay for high throughput writes. */
public SpannerConfig withMaxCommitDelay(long millis) {
return withMaxCommitDelay(Duration.millis(millis));
}

/** Specifies the max commit delay for high throughput writes. */
public SpannerConfig withMaxCommitDelay(Duration maxCommitDelay) {
return withMaxCommitDelay(ValueProvider.StaticValueProvider.of(maxCommitDelay));
}

/** Specifies the max commit delay for high throughput writes. */
public SpannerConfig withMaxCommitDelay(ValueProvider<Duration> maxCommitDelay) {
checkNotNull(maxCommitDelay, "withMaxCommitTimeout(maxCommitDelay) called with null input.");
return toBuilder().setMaxCommitDelay(maxCommitDelay).build();
}

/** Specifies the Cloud Spanner database role. */
public SpannerConfig withDatabaseRole(ValueProvider<String> databaseRole) {
return toBuilder().setDatabaseRole(databaseRole).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
Expand Down Expand Up @@ -874,6 +876,11 @@ public Read withHighPriority() {
return withSpannerConfig(config.withRpcPriority(RpcPriority.HIGH));
}

public Read withMaxCommitDelay(long millis) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withMaxCommitDelay(millis));
}

@Override
public PCollection<Struct> expand(PBegin input) {
getSpannerConfig().validate();
Expand Down Expand Up @@ -2227,15 +2234,9 @@ public void processElement(ProcessContext c) throws Exception {
private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws SpannerException {
for (int retry = 1; ; retry++) {
try {
if (spannerConfig.getRpcPriority() != null
&& spannerConfig.getRpcPriority().get() != null) {
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(
batch, Options.priority(spannerConfig.getRpcPriority().get()));
} else {
spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch);
}
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(batch, getTransactionOptions());
reportServiceCallMetricsForBatch(batch, "ok");
return;
} catch (AbortedException e) {
Expand All @@ -2256,6 +2257,21 @@ private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws Sp
}
}

private Options.TransactionOption[] getTransactionOptions() {
return Stream.of(
spannerConfig.getRpcPriority() != null && spannerConfig.getRpcPriority().get() != null
? Options.priority(spannerConfig.getRpcPriority().get())
: null,
spannerConfig.getMaxCommitDelay() != null
&& spannerConfig.getMaxCommitDelay().get() != null
? Options.maxCommitDelay(
java.time.Duration.ofMillis(
spannerConfig.getMaxCommitDelay().get().getMillis()))
: null)
.filter(Objects::nonNull)
.toArray(Options.TransactionOption[]::new);
}

private void reportServiceCallMetricsForBatch(List<Mutation> batch, String statusCode) {
// Get names of all tables in batch of mutations.
Set<String> tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,19 @@ public void runBatchQueryTestWithPriority() {
assertEquals(RpcPriority.HIGH, readTransform.getSpannerConfig().getRpcPriority().get());
}

@Test
public void runBatchQueryTestWithMaxCommitDelay() {
SpannerIO.Read readTransform =
SpannerIO.read()
.withSpannerConfig(spannerConfig)
.withQuery(QUERY_STATEMENT)
.withQueryName(QUERY_NAME)
.withTimestampBound(TIMESTAMP_BOUND)
.withMaxCommitDelay(100L);
runBatchQueryTest(readTransform);
assertEquals(100L, readTransform.getSpannerConfig().getMaxCommitDelay().get().getMillis());
}

@Test
public void runBatchQueryTestWithDataBoost() {
SpannerConfig spannerConfig1 = spannerConfig.withDataBoostEnabled(StaticValueProvider.of(true));
Expand Down

0 comments on commit d0efd2e

Please sign in to comment.