Skip to content

Commit

Permalink
feat: Add x-goog-spanner-route-to-leader header to Spanner RPC cont…
Browse files Browse the repository at this point in the history
…exts for RW/PDML transactions.

The header is added to support leader-aware-routing feature, which aims at reducing cross-regional latency for RW/PDML transactions in a multi-region instance.
  • Loading branch information
yifanzyifanz committed Dec 30, 2022
1 parent a40bda9 commit 2cd2fbd
Show file tree
Hide file tree
Showing 15 changed files with 198 additions and 73 deletions.
Expand Up @@ -161,6 +161,7 @@ static Builder newBuilder() {
private SingleReadContext(Builder builder) {
super(builder);
this.bound = builder.bound;
this.routeToLeader = false;
}

@GuardedBy("lock")
Expand Down Expand Up @@ -291,6 +292,7 @@ static Builder newBuilder() {
this.timestamp = builder.timestamp;
this.transactionId = builder.transactionId;
}
this.routeToLeader = false;
}

@Override
Expand Down Expand Up @@ -347,7 +349,8 @@ void initTransaction() {
.setSession(session.getName())
.setOptions(options)
.build();
Transaction transaction = rpc.beginTransaction(request, session.getOptions());
Transaction transaction =
rpc.beginTransaction(request, session.getOptions(), routeToLeader);
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
Expand Down Expand Up @@ -380,6 +383,7 @@ void initTransaction() {
Span span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;
protected boolean routeToLeader = false;

@GuardedBy("lock")
private boolean isValid = true;
Expand Down Expand Up @@ -664,7 +668,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
rpc.executeQuery(
request.build(), stream.consumer(), session.getOptions(), routeToLeader);
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
Expand Down Expand Up @@ -792,7 +797,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
rpc.read(builder.build(), stream.consumer(), session.getOptions(), routeToLeader);
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
Expand Down
Expand Up @@ -202,7 +202,7 @@ private ByteString initTransaction() {
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions());
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
Expand Down
Expand Up @@ -267,7 +267,7 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
@Override
public void prepareReadWriteTransaction() {
setActive(null);
readyTransactionId = beginTransaction();
readyTransactionId = beginTransaction(true);
}

@Override
Expand All @@ -288,17 +288,17 @@ public void close() {
}
}

ByteString beginTransaction() {
ByteString beginTransaction(boolean routeToLeader) {
try {
return beginTransactionAsync().get();
return beginTransactionAsync(routeToLeader).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}

ApiFuture<ByteString> beginTransactionAsync() {
ApiFuture<ByteString> beginTransactionAsync(boolean routeToLeader) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
final BeginTransactionRequest request =
Expand All @@ -309,7 +309,7 @@ ApiFuture<ByteString> beginTransactionAsync() {
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
.build();
final ApiFuture<Transaction> requestFuture =
spanner.getRpc().beginTransactionAsync(request, options);
spanner.getRpc().beginTransactionAsync(request, options, routeToLeader);
requestFuture.addListener(
tracer.withSpan(
span,
Expand Down
Expand Up @@ -196,6 +196,7 @@ private TransactionContextImpl(Builder builder) {
this.trackTransactionStarter = builder.trackTransactionStarter;
this.options = builder.options;
this.finishedAsyncOperations.set(null);
this.routeToLeader = true;
}

private void increaseAsyncOperations() {
Expand Down Expand Up @@ -255,7 +256,7 @@ ApiFuture<Void> ensureTxnAsync() {

private void createTxnAsync(final SettableApiFuture<Void> res) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut = session.beginTransactionAsync();
final ApiFuture<ByteString> fut = session.beginTransactionAsync(routeToLeader);
fut.addListener(
() -> {
try {
Expand Down Expand Up @@ -719,7 +720,7 @@ private ResultSet internalExecuteUpdate(
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
rpc.executeQuery(builder.build(), session.getOptions(), routeToLeader);
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
Expand Down Expand Up @@ -749,7 +750,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
// Register the update as an async operation that must finish before the transaction may
// commit.
increaseAsyncOperations();
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions());
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), routeToLeader);
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down
Expand Up @@ -1592,7 +1592,8 @@ public List<Session> batchCreateSessions(
requestBuilder.setSessionTemplate(sessionBuilder);
BatchCreateSessionsRequest request = requestBuilder.build();
GrpcCallContext context =
newCallContext(options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod());
newCallContext(
options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod(), true);
return get(spannerStub.batchCreateSessionsCallable().futureCall(request, context))
.getSessionList();
}
Expand All @@ -1616,7 +1617,7 @@ public Session createSession(
requestBuilder.setSession(sessionBuilder);
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context =
newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod());
newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod(), true);
return get(spannerStub.createSessionCallable().futureCall(request, context));
}

Expand All @@ -1630,15 +1631,19 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context =
newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod());
newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod(), false);
return spannerStub.deleteSessionCallable().futureCall(request, context);
}

@Override
public StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
ReadRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean routeToLeader) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getReadMethod());
newCallContext(
options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader);
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.streamingReadCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
Expand All @@ -1658,13 +1663,14 @@ public void cancel(String message) {
}

@Override
public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
return get(executeQueryAsync(request, options));
public ResultSet executeQuery(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
return get(executeQueryAsync(request, options, routeToLeader));
}

@Override
public ApiFuture<ResultSet> executeQueryAsync(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
return spannerStub.executeSqlCallable().futureCall(request, context);
Expand All @@ -1674,7 +1680,8 @@ public ApiFuture<ResultSet> executeQueryAsync(
public ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
newCallContext(
options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod(), true);
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
}

Expand All @@ -1688,18 +1695,29 @@ public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
options,
request.getSession(),
request,
SpannerGrpc.getExecuteStreamingSqlMethod(),
true);
// Override any timeout settings that might have been set on the call context.
context = context.withTimeout(timeout).withStreamWaitTimeout(timeout);
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
ExecuteSqlRequest request,
ResultStreamConsumer consumer,
@Nullable Map<Option, ?> options,
boolean routeToLeader) {
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
options,
request.getSession(),
request,
SpannerGrpc.getExecuteStreamingSqlMethod(),
routeToLeader);
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
Expand Down Expand Up @@ -1729,30 +1747,35 @@ public ApiFuture<ExecuteBatchDmlResponse> executeBatchDmlAsync(
ExecuteBatchDmlRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod());
options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod(), true);
return spannerStub.executeBatchDmlCallable().futureCall(request, context);
}

@Override
public ApiFuture<Transaction> beginTransactionAsync(
BeginTransactionRequest request, @Nullable Map<Option, ?> options) {
BeginTransactionRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getBeginTransactionMethod());
options,
request.getSession(),
request,
SpannerGrpc.getBeginTransactionMethod(),
routeToLeader);
return spannerStub.beginTransactionCallable().futureCall(request, context);
}

@Override
public Transaction beginTransaction(
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
return get(beginTransactionAsync(request, options));
BeginTransactionRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader)
throws SpannerException {
return get(beginTransactionAsync(request, options, routeToLeader));
}

@Override
public ApiFuture<CommitResponse> commitAsync(
CommitRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod());
newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true);
return spannerStub.commitCallable().futureCall(request, context);
}

Expand All @@ -1765,7 +1788,8 @@ public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option,
@Override
public ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
newCallContext(options, request.getSession(), request, SpannerGrpc.getRollbackMethod());
newCallContext(
options, request.getSession(), request, SpannerGrpc.getRollbackMethod(), true);
return spannerStub.rollbackCallable().futureCall(request, context);
}

Expand All @@ -1780,7 +1804,7 @@ public PartitionResponse partitionQuery(
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod());
options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod(), true);
return get(spannerStub.partitionQueryCallable().futureCall(request, context));
}

Expand All @@ -1789,7 +1813,7 @@ public PartitionResponse partitionRead(
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
GrpcCallContext context =
newCallContext(
options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod());
options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod(), true);
return get(spannerStub.partitionReadCallable().futureCall(request, context));
}

Expand Down Expand Up @@ -1898,6 +1922,16 @@ <ReqT, RespT> GrpcCallContext newCallContext(
String resource,
ReqT request,
MethodDescriptor<ReqT, RespT> method) {
return newCallContext(options, resource, request, method, false);
}

@VisibleForTesting
<ReqT, RespT> GrpcCallContext newCallContext(
@Nullable Map<Option, ?> options,
String resource,
ReqT request,
MethodDescriptor<ReqT, RespT> method,
boolean routeToLeader) {
GrpcCallContext context = GrpcCallContext.createDefault();
if (options != null) {
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
Expand All @@ -1907,6 +1941,9 @@ <ReqT, RespT> GrpcCallContext newCallContext(
context = context.withCallOptions(context.getCallOptions().withCompression(compressorName));
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (routeToLeader) {
context = context.withExtraHeaders(metadataProvider.newRouteToLeaderHeader());
}
if (callCredentialsProvider != null) {
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
if (callCredentials != null) {
Expand Down
Expand Up @@ -28,7 +28,7 @@
class SpannerMetadataProvider {
private final Map<Metadata.Key<String>, String> headers;
private final String resourceHeaderKey;

private static final String routeToLeaderHeaderKey = "x-goog-spanner-route-to-leader";
private static final Pattern[] RESOURCE_TOKEN_PATTERNS = {
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"),
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*)(.*)?")
Expand Down Expand Up @@ -66,6 +66,12 @@ Map<String, List<String>> newExtraHeaders(
.build();
}

Map<String, List<String>> newRouteToLeaderHeader() {
return ImmutableMap.<String, List<String>>builder()
.put(routeToLeaderHeaderKey, Collections.singletonList("true"))
.build();
}

private Map<Metadata.Key<String>, String> constructHeadersAsMetadata(
Map<String, String> headers) {
ImmutableMap.Builder<Metadata.Key<String>, String> headersAsMetadataBuilder =
Expand Down

0 comments on commit 2cd2fbd

Please sign in to comment.