Skip to content

Commit

Permalink
feat: Leader Aware Routing (#2214)
Browse files Browse the repository at this point in the history
* feat: Add `x-goog-spanner-route-to-leader` header to Spanner RPC contexts for RW/PDML transactions.

The header is added to support leader-aware-routing feature, which aims at reducing cross-regional latency for RW/PDML transactions in a multi-region instance.

* feat: Add knob in SpannerOptions to allow users to opt out leader aware routing feature

* fix: fix broken tests due to the merge

* fix: resolve comments.

* fix: resolve comments and add new tests to verify that the route-to-leader header exists for RW transactions and does not exist for RO transactions or when the leader aware routing feature is disabled.

* fix: Update comments for SpannerOptions.disableLeaderAwareRouting

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: Disable leader aware routing by default for public preview

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Rajat Bhatta <93644539+rajatbhatta@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 28, 2023
1 parent 85213c8 commit 9695ace
Show file tree
Hide file tree
Showing 18 changed files with 674 additions and 269 deletions.
99 changes: 99 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -222,6 +222,105 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet analyzeUpdateStatement(com.google.cloud.spanner.Statement, com.google.cloud.spanner.ReadContext$QueryAnalyzeMode, com.google.cloud.spanner.Options$UpdateOption[])</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
<to>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.spanner.v1.ResultSet</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean)</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map)</method>
<to>com.google.spanner.v1.ResultSet</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
<to>com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall</to>
</difference>

<!-- Savepoints -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Expand Up @@ -163,6 +163,11 @@ private SingleReadContext(Builder builder) {
this.bound = builder.bound;
}

@Override
protected boolean isRouteToLeader() {
return false;
}

@GuardedBy("lock")
@Override
void beforeReadOrQueryLocked() {
Expand Down Expand Up @@ -293,6 +298,11 @@ static Builder newBuilder() {
}
}

@Override
protected boolean isRouteToLeader() {
return false;
}

@Override
void beforeReadOrQuery() {
super.beforeReadOrQuery();
Expand Down Expand Up @@ -347,7 +357,8 @@ void initTransaction() {
.setSession(session.getName())
.setOptions(options)
.build();
Transaction transaction = rpc.beginTransaction(request, session.getOptions());
Transaction transaction =
rpc.beginTransaction(request, session.getOptions(), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
Expand Down Expand Up @@ -416,6 +427,10 @@ long getSeqNo() {
return seqNo.incrementAndGet();
}

protected boolean isRouteToLeader() {
return false;
}

@Override
public final ResultSet read(
String table, KeySet keys, Iterable<String> columns, ReadOption... options) {
Expand Down Expand Up @@ -667,7 +682,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
rpc.executeQuery(
request.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
Expand Down Expand Up @@ -798,7 +814,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
rpc.read(
builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
Expand Down
Expand Up @@ -202,7 +202,7 @@ private ByteString initTransaction() {
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions());
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
Expand Down
Expand Up @@ -275,7 +275,7 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
@Override
public void prepareReadWriteTransaction() {
setActive(null);
readyTransactionId = beginTransaction();
readyTransactionId = beginTransaction(true);
}

@Override
Expand All @@ -296,21 +296,21 @@ public void close() {
}
}

ByteString beginTransaction() {
ByteString beginTransaction(boolean routeToLeader) {
try {
return beginTransactionAsync().get();
return beginTransactionAsync(routeToLeader).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}

ApiFuture<ByteString> beginTransactionAsync() {
return beginTransactionAsync(Options.fromTransactionOptions());
ApiFuture<ByteString> beginTransactionAsync(boolean routeToLeader) {
return beginTransactionAsync(Options.fromTransactionOptions(), routeToLeader);
}

ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions) {
ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean routeToLeader) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
final BeginTransactionRequest request =
Expand All @@ -319,7 +319,7 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions) {
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.build();
final ApiFuture<Transaction> requestFuture =
spanner.getRpc().beginTransactionAsync(request, options);
spanner.getRpc().beginTransactionAsync(request, options, routeToLeader);
requestFuture.addListener(
tracer.withSpan(
span,
Expand Down
Expand Up @@ -132,6 +132,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final CallCredentialsProvider callCredentialsProvider;
private final CloseableExecutorProvider asyncExecutorProvider;
private final String compressorName;
private final boolean leaderAwareRoutingEnabled;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -600,6 +601,7 @@ private SpannerOptions(Builder builder) {
callCredentialsProvider = builder.callCredentialsProvider;
asyncExecutorProvider = builder.asyncExecutorProvider;
compressorName = builder.compressorName;
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
}

/**
Expand Down Expand Up @@ -700,6 +702,7 @@ public static class Builder
private CloseableExecutorProvider asyncExecutorProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private boolean leaderAwareRoutingEnabled = false;

private Builder() {
// Manually set retry and polling settings that work.
Expand Down Expand Up @@ -1155,6 +1158,24 @@ public Builder setEmulatorHost(String emulatorHost) {
return this;
}

/**
* Enable leader aware routing. Leader aware routing would route all requests in RW/PDML
* transactions to the leader region.
*/
public Builder enableLeaderAwareRouting() {
this.leaderAwareRoutingEnabled = true;
return this;
}

/**
* Disable leader aware routing. Disabling leader aware routing would route all requests in
* RW/PDML transactions to any region.
*/
public Builder disableLeaderAwareRouting() {
this.leaderAwareRoutingEnabled = false;
return this;
}

@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
Expand Down Expand Up @@ -1291,6 +1312,10 @@ public String getCompressorName() {
return compressorName;
}

public boolean isLeaderAwareRoutingEnabled() {
return leaderAwareRoutingEnabled;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
Expand Up @@ -198,6 +198,11 @@ private TransactionContextImpl(Builder builder) {
this.finishedAsyncOperations.set(null);
}

@Override
protected boolean isRouteToLeader() {
return true;
}

private void increaseAsyncOperations() {
synchronized (lock) {
if (runningAsyncOperations == 0) {
Expand Down Expand Up @@ -255,7 +260,7 @@ ApiFuture<Void> ensureTxnAsync() {

private void createTxnAsync(final SettableApiFuture<Void> res) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut = session.beginTransactionAsync(options);
final ApiFuture<ByteString> fut = session.beginTransactionAsync(options, isRouteToLeader());
fut.addListener(
() -> {
try {
Expand Down Expand Up @@ -717,7 +722,7 @@ private ResultSet internalExecuteUpdate(
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
Expand Down Expand Up @@ -747,7 +752,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
// Register the update as an async operation that must finish before the transaction may
// commit.
increaseAsyncOperations();
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions());
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader());
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down

0 comments on commit 9695ace

Please sign in to comment.