From 9695acee9195b50e525d87700e86d701b1d9eed2 Mon Sep 17 00:00:00 2001 From: Yifan Zhou <119901292+yifanzyifanz@users.noreply.github.com> Date: Fri, 28 Apr 2023 00:21:56 -0700 Subject: [PATCH] feat: Leader Aware Routing (#2214) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Add `x-goog-spanner-route-to-leader` header to Spanner RPC contexts for RW/PDML transactions. The header is added to support leader-aware-routing feature, which aims at reducing cross-regional latency for RW/PDML transactions in a multi-region instance. * feat: Add knob in SpannerOptions to allow users to opt out leader aware routing feature * fix: fix broken tests due to the merge * fix: resolve comments. * fix: resolve comments and add new tests to verify that the route-to-leader header exists for RW transactions and does not exist for RO transactions or when the leader aware routing feature is disabled. * fix: Update comments for SpannerOptions.disableLeaderAwareRouting * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: Disable leader aware routing by default for public preview --------- Co-authored-by: Owl Bot Co-authored-by: Rajat Bhatta <93644539+rajatbhatta@users.noreply.github.com> --- .../clirr-ignored-differences.xml | 99 ++++++ .../cloud/spanner/AbstractReadContext.java | 23 +- .../spanner/PartitionedDmlTransaction.java | 2 +- .../com/google/cloud/spanner/SessionImpl.java | 14 +- .../google/cloud/spanner/SpannerOptions.java | 25 ++ .../cloud/spanner/TransactionRunnerImpl.java | 11 +- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 86 +++-- .../spi/v1/SpannerMetadataProvider.java | 9 +- .../cloud/spanner/spi/v1/SpannerRpc.java | 68 +++- .../cloud/spanner/BatchClientImplTest.java | 3 +- .../PartitionedDmlTransactionTest.java | 23 +- .../google/cloud/spanner/SessionImplTest.java | 18 +- .../google/cloud/spanner/SessionPoolTest.java | 11 +- .../cloud/spanner/SpannerOptionsTest.java | 316 +++++++++--------- .../spanner/TransactionManagerImplTest.java | 17 +- .../spanner/TransactionRunnerImplTest.java | 16 +- .../spanner/spi/v1/GapicSpannerRpcTest.java | 182 ++++++++-- .../spi/v1/SpannerMetadataProviderTest.java | 20 +- 18 files changed, 674 insertions(+), 269 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 67e9663747..f158f62ea8 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -222,6 +222,105 @@ com/google/cloud/spanner/connection/Connection com.google.cloud.spanner.ResultSet analyzeUpdateStatement(com.google.cloud.spanner.Statement, com.google.cloud.spanner.ReadContext$QueryAnalyzeMode, com.google.cloud.spanner.Options$UpdateOption[]) + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map, boolean) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + + + 7005 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean) + + + 7006 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.spanner.v1.ResultSet + + + 7006 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.spanner.v1.Transaction beginTransaction(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.BeginTransactionRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall read(com.google.spanner.v1.ReadRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + + + 7005 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, boolean) + + + 7006 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall executeQuery(com.google.spanner.v1.ExecuteSqlRequest, com.google.cloud.spanner.spi.v1.SpannerRpc$ResultStreamConsumer, java.util.Map) + com.google.spanner.v1.ResultSet + + + 7006 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.spanner.v1.ResultSet executeQuery(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map) + com.google.cloud.spanner.spi.v1.SpannerRpc$StreamingCall + + 7012 diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 7facd19c82..e19ace944a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -163,6 +163,11 @@ private SingleReadContext(Builder builder) { this.bound = builder.bound; } + @Override + protected boolean isRouteToLeader() { + return false; + } + @GuardedBy("lock") @Override void beforeReadOrQueryLocked() { @@ -293,6 +298,11 @@ static Builder newBuilder() { } } + @Override + protected boolean isRouteToLeader() { + return false; + } + @Override void beforeReadOrQuery() { super.beforeReadOrQuery(); @@ -347,7 +357,8 @@ void initTransaction() { .setSession(session.getName()) .setOptions(options) .build(); - Transaction transaction = rpc.beginTransaction(request, session.getOptions()); + Transaction transaction = + rpc.beginTransaction(request, session.getOptions(), isRouteToLeader()); if (!transaction.hasReadTimestamp()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); @@ -416,6 +427,10 @@ long getSeqNo() { return seqNo.incrementAndGet(); } + protected boolean isRouteToLeader() { + return false; + } + @Override public final ResultSet read( String table, KeySet keys, Iterable columns, ReadOption... options) { @@ -667,7 +682,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken request.setTransaction(selector); } SpannerRpc.StreamingCall call = - rpc.executeQuery(request.build(), stream.consumer(), session.getOptions()); + rpc.executeQuery( + request.build(), stream.consumer(), session.getOptions(), isRouteToLeader()); call.request(prefetchChunks); stream.setCall(call, request.getTransaction().hasBegin()); return stream; @@ -798,7 +814,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken } builder.setRequestOptions(buildRequestOptions(readOptions)); SpannerRpc.StreamingCall call = - rpc.read(builder.build(), stream.consumer(), session.getOptions()); + rpc.read( + builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader()); call.request(prefetchChunks); stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); return stream; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 976f6136db..36991b18c3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -202,7 +202,7 @@ private ByteString initTransaction() { TransactionOptions.newBuilder() .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) .build(); - Transaction tx = rpc.beginTransaction(request, session.getOptions()); + Transaction tx = rpc.beginTransaction(request, session.getOptions(), true); if (tx.getId().isEmpty()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index df6163e93e..2bef8e3ada 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -275,7 +275,7 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... @Override public void prepareReadWriteTransaction() { setActive(null); - readyTransactionId = beginTransaction(); + readyTransactionId = beginTransaction(true); } @Override @@ -296,9 +296,9 @@ public void close() { } } - ByteString beginTransaction() { + ByteString beginTransaction(boolean routeToLeader) { try { - return beginTransactionAsync().get(); + return beginTransactionAsync(routeToLeader).get(); } catch (ExecutionException e) { throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()); } catch (InterruptedException e) { @@ -306,11 +306,11 @@ ByteString beginTransaction() { } } - ApiFuture beginTransactionAsync() { - return beginTransactionAsync(Options.fromTransactionOptions()); + ApiFuture beginTransactionAsync(boolean routeToLeader) { + return beginTransactionAsync(Options.fromTransactionOptions(), routeToLeader); } - ApiFuture beginTransactionAsync(Options transactionOptions) { + ApiFuture beginTransactionAsync(Options transactionOptions, boolean routeToLeader) { final SettableApiFuture res = SettableApiFuture.create(); final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan(); final BeginTransactionRequest request = @@ -319,7 +319,7 @@ ApiFuture beginTransactionAsync(Options transactionOptions) { .setOptions(createReadWriteTransactionOptions(transactionOptions)) .build(); final ApiFuture requestFuture = - spanner.getRpc().beginTransactionAsync(request, options); + spanner.getRpc().beginTransactionAsync(request, options, routeToLeader); requestFuture.addListener( tracer.withSpan( span, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index d6243f5959..6663d3f8e6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -132,6 +132,7 @@ public class SpannerOptions extends ServiceOptions { private final CallCredentialsProvider callCredentialsProvider; private final CloseableExecutorProvider asyncExecutorProvider; private final String compressorName; + private final boolean leaderAwareRoutingEnabled; /** * Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to @@ -600,6 +601,7 @@ private SpannerOptions(Builder builder) { callCredentialsProvider = builder.callCredentialsProvider; asyncExecutorProvider = builder.asyncExecutorProvider; compressorName = builder.compressorName; + leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled; } /** @@ -700,6 +702,7 @@ public static class Builder private CloseableExecutorProvider asyncExecutorProvider; private String compressorName; private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); + private boolean leaderAwareRoutingEnabled = false; private Builder() { // Manually set retry and polling settings that work. @@ -1155,6 +1158,24 @@ public Builder setEmulatorHost(String emulatorHost) { return this; } + /** + * Enable leader aware routing. Leader aware routing would route all requests in RW/PDML + * transactions to the leader region. + */ + public Builder enableLeaderAwareRouting() { + this.leaderAwareRoutingEnabled = true; + return this; + } + + /** + * Disable leader aware routing. Disabling leader aware routing would route all requests in + * RW/PDML transactions to any region. + */ + public Builder disableLeaderAwareRouting() { + this.leaderAwareRoutingEnabled = false; + return this; + } + @SuppressWarnings("rawtypes") @Override public SpannerOptions build() { @@ -1291,6 +1312,10 @@ public String getCompressorName() { return compressorName; } + public boolean isLeaderAwareRoutingEnabled() { + return leaderAwareRoutingEnabled; + } + /** Returns the default query options to use for the specific database. */ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) { // Use the specific query options for the database if any have been specified. These have diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 3d3b34c4c3..ef937e993b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -198,6 +198,11 @@ private TransactionContextImpl(Builder builder) { this.finishedAsyncOperations.set(null); } + @Override + protected boolean isRouteToLeader() { + return true; + } + private void increaseAsyncOperations() { synchronized (lock) { if (runningAsyncOperations == 0) { @@ -255,7 +260,7 @@ ApiFuture ensureTxnAsync() { private void createTxnAsync(final SettableApiFuture res) { span.addAnnotation("Creating Transaction"); - final ApiFuture fut = session.beginTransactionAsync(options); + final ApiFuture fut = session.beginTransactionAsync(options, isRouteToLeader()); fut.addListener( () -> { try { @@ -717,7 +722,7 @@ private ResultSet internalExecuteUpdate( /* withTransactionSelector = */ true); try { com.google.spanner.v1.ResultSet resultSet = - rpc.executeQuery(builder.build(), session.getOptions()); + rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader()); if (resultSet.getMetadata().hasTransaction()) { onTransactionMetadata( resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin()); @@ -747,7 +752,7 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... o // Register the update as an async operation that must finish before the transaction may // commit. increaseAsyncOperations(); - resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions()); + resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader()); } catch (Throwable t) { decreaseAsyncOperations(); throw t; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index eb8633d044..4d70a26866 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -252,6 +252,7 @@ public class GapicSpannerRpc implements SpannerRpc { private static final double ADMINISTRATIVE_REQUESTS_RATE_LIMIT = 1.0D; private static final ConcurrentMap ADMINISTRATIVE_REQUESTS_RATE_LIMITERS = new ConcurrentHashMap<>(); + private final boolean leaderAwareRoutingEnabled; public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -302,6 +303,7 @@ public GapicSpannerRpc(final SpannerOptions options) { internalHeaderProviderBuilder.getResourceHeaderKey()); this.callCredentialsProvider = options.getCallCredentialsProvider(); this.compressorName = options.getCompressorName(); + this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled(); if (initializeStubs) { // First check if SpannerOptions provides a TransportChannelProvider. Create one @@ -1527,7 +1529,8 @@ public List batchCreateSessions( requestBuilder.setSessionTemplate(sessionBuilder); BatchCreateSessionsRequest request = requestBuilder.build(); GrpcCallContext context = - newCallContext(options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod()); + newCallContext( + options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod(), true); return get(spannerStub.batchCreateSessionsCallable().futureCall(request, context)) .getSessionList(); } @@ -1551,7 +1554,7 @@ public Session createSession( requestBuilder.setSession(sessionBuilder); CreateSessionRequest request = requestBuilder.build(); GrpcCallContext context = - newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod()); + newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod(), true); return get(spannerStub.createSessionCallable().futureCall(request, context)); } @@ -1571,9 +1574,13 @@ public ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) { + ReadRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getReadMethod()); + newCallContext( + options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader); SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); spannerStub.streamingReadCallable().call(request, responseObserver, context); final StreamController controller = responseObserver.getController(); @@ -1593,15 +1600,21 @@ public void cancel(String message) { } @Override - public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options) { - return get(executeQueryAsync(request, options)); + public ResultSet executeQuery( + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader) { + return get(executeQueryAsync(request, options, routeToLeader)); } @Override public ApiFuture executeQueryAsync( - ExecuteSqlRequest request, @Nullable Map options) { + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod()); + newCallContext( + options, + request.getSession(), + request, + SpannerGrpc.getExecuteSqlMethod(), + routeToLeader); return spannerStub.executeSqlCallable().futureCall(request, context); } @@ -1609,7 +1622,8 @@ public ApiFuture executeQueryAsync( public ResultSet executePartitionedDml( ExecuteSqlRequest request, @Nullable Map options) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod()); + newCallContext( + options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod(), true); return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context)); } @@ -1623,7 +1637,11 @@ public ServerStream executeStreamingPartitionedDml( ExecuteSqlRequest request, Map options, Duration timeout) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod()); + options, + request.getSession(), + request, + SpannerGrpc.getExecuteStreamingSqlMethod(), + true); // Override any timeout settings that might have been set on the call context. context = context.withTimeout(timeout).withStreamWaitTimeout(timeout); return partitionedDmlStub.executeStreamingSqlCallable().call(request, context); @@ -1631,10 +1649,17 @@ public ServerStream executeStreamingPartitionedDml( @Override public StreamingCall executeQuery( - ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { + ExecuteSqlRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod()); + options, + request.getSession(), + request, + SpannerGrpc.getExecuteStreamingSqlMethod(), + routeToLeader); SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context); final StreamController controller = responseObserver.getController(); @@ -1664,30 +1689,35 @@ public ApiFuture executeBatchDmlAsync( ExecuteBatchDmlRequest request, @Nullable Map options) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod()); + options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod(), true); return spannerStub.executeBatchDmlCallable().futureCall(request, context); } @Override public ApiFuture beginTransactionAsync( - BeginTransactionRequest request, @Nullable Map options) { + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader) { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getBeginTransactionMethod()); + options, + request.getSession(), + request, + SpannerGrpc.getBeginTransactionMethod(), + routeToLeader); return spannerStub.beginTransactionCallable().futureCall(request, context); } @Override public Transaction beginTransaction( - BeginTransactionRequest request, @Nullable Map options) throws SpannerException { - return get(beginTransactionAsync(request, options)); + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader) + throws SpannerException { + return get(beginTransactionAsync(request, options, routeToLeader)); } @Override public ApiFuture commitAsync( CommitRequest request, @Nullable Map options) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod()); + newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true); return spannerStub.commitCallable().futureCall(request, context); } @@ -1700,7 +1730,8 @@ public CommitResponse commit(CommitRequest commitRequest, @Nullable Map rollbackAsync(RollbackRequest request, @Nullable Map options) { GrpcCallContext context = - newCallContext(options, request.getSession(), request, SpannerGrpc.getRollbackMethod()); + newCallContext( + options, request.getSession(), request, SpannerGrpc.getRollbackMethod(), true); return spannerStub.rollbackCallable().futureCall(request, context); } @@ -1715,7 +1746,7 @@ public PartitionResponse partitionQuery( PartitionQueryRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod()); + options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod(), true); return get(spannerStub.partitionQueryCallable().futureCall(request, context)); } @@ -1724,7 +1755,7 @@ public PartitionResponse partitionRead( PartitionReadRequest request, @Nullable Map options) throws SpannerException { GrpcCallContext context = newCallContext( - options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod()); + options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod(), true); return get(spannerStub.partitionReadCallable().futureCall(request, context)); } @@ -1833,6 +1864,16 @@ GrpcCallContext newCallContext( String resource, ReqT request, MethodDescriptor method) { + return newCallContext(options, resource, request, method, false); + } + + @VisibleForTesting + GrpcCallContext newCallContext( + @Nullable Map options, + String resource, + ReqT request, + MethodDescriptor method, + boolean routeToLeader) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); @@ -1842,6 +1883,9 @@ GrpcCallContext newCallContext( context = context.withCallOptions(context.getCallOptions().withCompression(compressorName)); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); + if (routeToLeader && leaderAwareRoutingEnabled) { + context = context.withExtraHeaders(metadataProvider.newRouteToLeaderHeader()); + } if (callCredentialsProvider != null) { CallCredentials callCredentials = callCredentialsProvider.getCallCredentials(); if (callCredentials != null) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java index 7f9a32765e..77406a5399 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java @@ -28,12 +28,15 @@ class SpannerMetadataProvider { private final Map, String> headers; private final String resourceHeaderKey; - + private static final String ROUTE_TO_LEADER_HEADER_KEY = "x-goog-spanner-route-to-leader"; private static final Pattern[] RESOURCE_TOKEN_PATTERNS = { Pattern.compile("^(?projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"), Pattern.compile("^(?projects/[^/]*/instances/[^/]*)(.*)?") }; + private static final Map> ROUTE_TO_LEADER_HEADER_MAP = + ImmutableMap.of(ROUTE_TO_LEADER_HEADER_KEY, Collections.singletonList("true")); + private SpannerMetadataProvider(Map headers, String resourceHeaderKey) { this.resourceHeaderKey = resourceHeaderKey; this.headers = constructHeadersAsMetadata(headers); @@ -66,6 +69,10 @@ Map> newExtraHeaders( .build(); } + Map> newRouteToLeaderHeader() { + return ROUTE_TO_LEADER_HEADER_MAP; + } + private Map, String> constructHeadersAsMetadata( Map headers) { ImmutableMap.Builder, String> headersAsMetadataBuilder = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 2f68b9c1df..552c53e85e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -319,13 +319,41 @@ Session createSession( ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) throws SpannerException; + /** + * Performs a streaming read. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ StreamingCall read( - ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options); + ReadRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader); - ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options); + /** + * Executes a query. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ + ResultSet executeQuery( + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader); + /** + * Executes a query asynchronously. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ ApiFuture executeQueryAsync( - ExecuteSqlRequest request, @Nullable Map options); + ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader); ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map options); @@ -334,19 +362,47 @@ ApiFuture executeQueryAsync( ServerStream executeStreamingPartitionedDml( ExecuteSqlRequest request, @Nullable Map options, Duration timeout); + /** + * Executes a query with streaming result. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ StreamingCall executeQuery( - ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options); + ExecuteSqlRequest request, + ResultStreamConsumer consumer, + @Nullable Map options, + boolean routeToLeader); ExecuteBatchDmlResponse executeBatchDml(ExecuteBatchDmlRequest build, Map options); ApiFuture executeBatchDmlAsync( ExecuteBatchDmlRequest build, Map options); - Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map options) + /** + * Begins a transaction. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ + Transaction beginTransaction( + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader) throws SpannerException; + /** + * Begins a transaction asynchronously. + * + * @param routeToLeader Set to true to route the request to the leader region, and false to route + * the request to any region. When leader aware routing is enabled, RW/PDML requests are + * preferred to be routed to the leader region, and RO requests (except for + * PartitionRead/PartitionQuery) are preferred to be routed to any region for optimal latency. + */ ApiFuture beginTransactionAsync( - BeginTransactionRequest request, @Nullable Map options); + BeginTransactionRequest request, @Nullable Map options, boolean routeToLeader); CommitResponse commit(CommitRequest commitRequest, @Nullable Map options) throws SpannerException; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index 2c29b87f86..18ae8a07b3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -91,7 +91,8 @@ public void testBatchReadOnlyTxnWithBound() throws Exception { com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP); Transaction txnMetadata = Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build(); - when(gapicRpc.beginTransaction(Mockito.any(), optionsCaptor.capture())).thenReturn(txnMetadata); + when(gapicRpc.beginTransaction(Mockito.any(), optionsCaptor.capture(), eq(false))) + .thenReturn(txnMetadata); BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong()); assertThat(batchTxn.getBatchTransactionId().getSessionId()).isEqualTo(SESSION_NAME); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index de1ec8fa39..93e0e3eb3d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -97,7 +98,7 @@ public void setup() { MockitoAnnotations.initMocks(this); when(session.getName()).thenReturn(sessionId); when(session.getOptions()).thenReturn(Collections.EMPTY_MAP); - when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap())) + when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); tx = new PartitionedDmlTransaction(session, rpc, ticker); @@ -117,7 +118,7 @@ public void testExecuteStreamingPartitionedUpdate() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -139,7 +140,7 @@ public void testExecuteStreamingPartitionedUpdateWithUpdateOptions() { Statement.of(sql), Duration.ofMinutes(10), Options.tag(tag)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithRequestOptions), anyMap(), any(Duration.class)); @@ -168,7 +169,7 @@ public void testExecuteStreamingPartitionedUpdateAborted() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc, times(2)) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -200,7 +201,7 @@ public void testExecuteStreamingPartitionedUpdateUnavailable() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -231,7 +232,7 @@ public void testExecuteStreamingPartitionedUpdateUnavailableAndThenDeadlineExcee SpannerException.class, () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -259,7 +260,7 @@ public void testExecuteStreamingPartitionedUpdateAbortedAndThenDeadlineExceeded( SpannerException.class, () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); - verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -297,7 +298,7 @@ public Long answer(InvocationOnMock invocation) { () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); // It should start a transaction exactly 10 times (10 ticks == 10 minutes). - verify(rpc, times(10)).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc, times(10)).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); // The last transaction should timeout before it starts the actual statement execution, which // means that the execute method is only executed 9 times. verify(rpc, times(9)) @@ -335,7 +336,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -374,7 +375,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() { long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); @@ -405,7 +406,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() { SpannerException.class, () -> tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10))); assertEquals(ErrorCode.INTERNAL, e.getErrorCode()); - verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)); verify(rpc) .executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 1174cbf4eb..90e9a684d9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -100,7 +101,9 @@ public void setUp() { Transaction txn = Transaction.newBuilder().setId(ByteString.copyFromUtf8("TEST")).build(); Mockito.when( rpc.beginTransactionAsync( - Mockito.any(BeginTransactionRequest.class), Mockito.any(Map.class))) + Mockito.any(BeginTransactionRequest.class), + Mockito.any(Map.class), + Mockito.anyBoolean())) .thenReturn(ApiFutures.immediateFuture(txn)); CommitResponse commitResponse = CommitResponse.newBuilder() @@ -350,7 +353,7 @@ public void singleUseContextClosesTransaction() { public void prepareClosesOldSingleUseContext() { ReadContext ctx = session.singleUse(TimestampBound.strong()); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))) + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) .thenReturn(Transaction.newBuilder().setId(ByteString.copyFromUtf8("t1")).build()); session.prepareReadWriteTransaction(); IllegalStateException e = @@ -414,7 +417,7 @@ public void request(int numMessages) {} private void mockRead(final PartialResultSet myResultSet) { final ArgumentCaptor consumer = ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); - Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options))) + Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options), eq(false))) .then( invocation -> { consumer.getValue().onPartialResultSet(myResultSet); @@ -430,7 +433,8 @@ public void multiUseReadOnlyTransactionReturnsEmptyTransactionMetadata() { PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))).thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) + .thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); @@ -448,7 +452,8 @@ public void multiUseReadOnlyTransactionReturnsMissingTimestamp() { PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))).thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) + .thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); @@ -467,7 +472,8 @@ public void multiUseReadOnlyTransactionReturnsMissingTransactionId() throws Pars PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options))).thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) + .thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 4e5b1e0395..1f7391a60f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -763,9 +763,12 @@ public void testSessionNotFoundReadWriteTransaction() { when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap())) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(rpc.executeQuery( - any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class))) + any(ExecuteSqlRequest.class), + any(ResultStreamConsumer.class), + any(Map.class), + eq(true))) .thenReturn(closedStreamingCall); - when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class))) + when(rpc.executeQuery(any(ExecuteSqlRequest.class), any(Map.class), eq(true))) .thenThrow(sessionNotFound); when(rpc.executeBatchDml(any(ExecuteBatchDmlRequest.class), any(Map.class))) .thenThrow(sessionNotFound); @@ -786,7 +789,7 @@ public void testSessionNotFoundReadWriteTransaction() { .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(closedSession.newTransaction(Options.fromTransactionOptions())) .thenReturn(closedTransactionContext); - when(closedSession.beginTransactionAsync(any())).thenThrow(sessionNotFound); + when(closedSession.beginTransactionAsync(any(), eq(true))).thenThrow(sessionNotFound); TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession); closedTransactionRunner.setSpan(mock(Span.class)); when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); @@ -799,7 +802,7 @@ public void testSessionNotFoundReadWriteTransaction() { final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class); when(openSession.newTransaction(Options.fromTransactionOptions())) .thenReturn(openTransactionContext); - when(openSession.beginTransactionAsync(any())) + when(openSession.beginTransactionAsync(any(), eq(true))) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession); openTransactionRunner.setSpan(mock(Span.class)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 8819dab462..03844a6b10 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -20,9 +20,13 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import com.google.api.gax.grpc.GrpcCallContext; @@ -82,7 +86,7 @@ public void defaultBuilder() { assertThat(options.getHost()).isEqualTo("http://" + System.getenv("SPANNER_EMULATOR_HOST")); } assertThat(options.getPrefetchChunks()).isEqualTo(4); - assertThat(options.getSessionLabels()).isNull(); + assertNull(options.getSessionLabels()); } @Test @@ -453,7 +457,7 @@ public void testInvalidTransport() { () -> SpannerOptions.newBuilder() .setTransportOptions(Mockito.mock(TransportOptions.class))); - assertThat(e.getMessage()).isNotNull(); + assertNotNull(e.getMessage()); } @Test @@ -463,7 +467,7 @@ public void testInvalidSessionLabels() { NullPointerException e = assertThrows( NullPointerException.class, () -> SpannerOptions.newBuilder().setSessionLabels(labels)); - assertThat(e.getMessage()).isNotNull(); + assertNotNull(e.getMessage()); } @Test @@ -471,7 +475,7 @@ public void testNullSessionLabels() { NullPointerException e = assertThrows( NullPointerException.class, () -> SpannerOptions.newBuilder().setSessionLabels(null)); - assertThat(e.getMessage()).isNotNull(); + assertNotNull(e.getMessage()); } @Test @@ -664,120 +668,119 @@ public void testCompressorName() { .build() .getCompressorName()) .isEqualTo("identity"); - assertThat( - SpannerOptions.newBuilder() - .setProjectId("p") - .setCompressorName(null) - .build() - .getCompressorName()) - .isNull(); + assertNull( + SpannerOptions.newBuilder() + .setProjectId("p") + .setCompressorName(null) + .build() + .getCompressorName()); assertThrows( IllegalArgumentException.class, () -> SpannerOptions.newBuilder().setCompressorName("foo")); } + @Test + public void testLeaderAwareRoutingEnablement() { + assertFalse( + SpannerOptions.newBuilder().setProjectId("p").build().isLeaderAwareRoutingEnabled()); + assertTrue( + SpannerOptions.newBuilder() + .setProjectId("p") + .enableLeaderAwareRouting() + .build() + .isLeaderAwareRoutingEnabled()); + assertFalse( + SpannerOptions.newBuilder() + .setProjectId("p") + .disableLeaderAwareRouting() + .build() + .isLeaderAwareRoutingEnabled()); + } + @Test public void testSpannerCallContextTimeoutConfigurator_NullValues() { SpannerCallContextTimeoutConfigurator configurator = SpannerCallContextTimeoutConfigurator.create(); ApiCallContext inputCallContext = GrpcCallContext.createDefault(); - assertThat( - configurator.configure( - inputCallContext, - BatchCreateSessionsRequest.getDefaultInstance(), - SpannerGrpc.getBatchCreateSessionsMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - CreateSessionRequest.getDefaultInstance(), - SpannerGrpc.getCreateSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ListSessionsRequest.getDefaultInstance(), - SpannerGrpc.getListSessionsMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - BeginTransactionRequest.getDefaultInstance(), - SpannerGrpc.getBeginTransactionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - CommitRequest.getDefaultInstance(), - SpannerGrpc.getCommitMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - RollbackRequest.getDefaultInstance(), - SpannerGrpc.getRollbackMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - ExecuteSqlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteSqlMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ExecuteSqlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteStreamingSqlMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ExecuteBatchDmlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteBatchDmlMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ReadRequest.getDefaultInstance(), - SpannerGrpc.getStreamingReadMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - PartitionQueryRequest.getDefaultInstance(), - SpannerGrpc.getPartitionQueryMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - PartitionReadRequest.getDefaultInstance(), - SpannerGrpc.getPartitionReadMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, + BatchCreateSessionsRequest.getDefaultInstance(), + SpannerGrpc.getBatchCreateSessionsMethod())); + assertNull( + configurator.configure( + inputCallContext, + CreateSessionRequest.getDefaultInstance(), + SpannerGrpc.getCreateSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + ListSessionsRequest.getDefaultInstance(), + SpannerGrpc.getListSessionsMethod())); + + assertNull( + configurator.configure( + inputCallContext, + BeginTransactionRequest.getDefaultInstance(), + SpannerGrpc.getBeginTransactionMethod())); + assertNull( + configurator.configure( + inputCallContext, CommitRequest.getDefaultInstance(), SpannerGrpc.getCommitMethod())); + assertNull( + configurator.configure( + inputCallContext, + RollbackRequest.getDefaultInstance(), + SpannerGrpc.getRollbackMethod())); + + assertNull( + configurator.configure( + inputCallContext, + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod())); + assertNull( + configurator.configure( + inputCallContext, + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteStreamingSqlMethod())); + assertNull( + configurator.configure( + inputCallContext, + ExecuteBatchDmlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteBatchDmlMethod())); + assertNull( + configurator.configure( + inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())); + assertNull( + configurator.configure( + inputCallContext, + ReadRequest.getDefaultInstance(), + SpannerGrpc.getStreamingReadMethod())); + + assertNull( + configurator.configure( + inputCallContext, + PartitionQueryRequest.getDefaultInstance(), + SpannerGrpc.getPartitionQueryMethod())); + assertNull( + configurator.configure( + inputCallContext, + PartitionReadRequest.getDefaultInstance(), + SpannerGrpc.getPartitionReadMethod())); } @Test @@ -795,49 +798,42 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() { ApiCallContext inputCallContext = GrpcCallContext.createDefault(); - assertThat( - configurator.configure( - inputCallContext, - BatchCreateSessionsRequest.getDefaultInstance(), - SpannerGrpc.getBatchCreateSessionsMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - CreateSessionRequest.getDefaultInstance(), - SpannerGrpc.getCreateSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - DeleteSessionRequest.getDefaultInstance(), - SpannerGrpc.getDeleteSessionMethod())) - .isNull(); - assertThat( - configurator.configure( - inputCallContext, - ListSessionsRequest.getDefaultInstance(), - SpannerGrpc.getListSessionsMethod())) - .isNull(); - - assertThat( - configurator.configure( - inputCallContext, - BeginTransactionRequest.getDefaultInstance(), - SpannerGrpc.getBeginTransactionMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, + BatchCreateSessionsRequest.getDefaultInstance(), + SpannerGrpc.getBatchCreateSessionsMethod())); + assertNull( + configurator.configure( + inputCallContext, + CreateSessionRequest.getDefaultInstance(), + SpannerGrpc.getCreateSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + DeleteSessionRequest.getDefaultInstance(), + SpannerGrpc.getDeleteSessionMethod())); + assertNull( + configurator.configure( + inputCallContext, + ListSessionsRequest.getDefaultInstance(), + SpannerGrpc.getListSessionsMethod())); + + assertNull( + configurator.configure( + inputCallContext, + BeginTransactionRequest.getDefaultInstance(), + SpannerGrpc.getBeginTransactionMethod())); assertThat( configurator .configure( @@ -855,12 +851,11 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() { .getTimeout()) .isEqualTo(Duration.ofSeconds(8L)); - assertThat( - configurator.configure( - inputCallContext, - ExecuteSqlRequest.getDefaultInstance(), - SpannerGrpc.getExecuteSqlMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod())); assertThat( configurator .configure( @@ -877,10 +872,9 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() { SpannerGrpc.getExecuteBatchDmlMethod()) .getTimeout()) .isEqualTo(Duration.ofSeconds(1L)); - assertThat( - configurator.configure( - inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())) - .isNull(); + assertNull( + configurator.configure( + inputCallContext, ReadRequest.getDefaultInstance(), SpannerGrpc.getReadMethod())); assertThat( configurator .configure( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 55df44a96d..f8462693fe 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -217,7 +218,8 @@ public void usesPreparedTransaction() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); - when(rpc.beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap())) + when(rpc.beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> ApiFutures.immediateFuture( @@ -241,7 +243,8 @@ public void usesPreparedTransaction() { mgr.commit(); } verify(rpc, times(1)) - .beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); } } @@ -278,7 +281,8 @@ public void inlineBegin() { com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) .build())); - when(rpc.beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap())) + when(rpc.beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> ApiFutures.immediateFuture( @@ -286,7 +290,7 @@ public void inlineBegin() { .setId(ByteString.copyFromUtf8(UUID.randomUUID().toString())) .build())); final AtomicInteger transactionsStarted = new AtomicInteger(); - when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap())) + when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> { ResultSet.Builder builder = @@ -332,9 +336,10 @@ public void inlineBegin() { } // BeginTransaction should not be called, as we are inlining it with the ExecuteSql request. verify(rpc, Mockito.never()) - .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); // We should have 2 ExecuteSql requests. - verify(rpc, times(2)).executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap()); + verify(rpc, times(2)) + .executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true)); // But only 1 with a BeginTransaction. assertThat(transactionsStarted.get()).isEqualTo(1); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 04ac46d887..df8245e6ac 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -100,7 +101,7 @@ public void setUp() { MockitoAnnotations.initMocks(this); firstRun = true; when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); - when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap())) + when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> { ResultSet.Builder builder = @@ -160,7 +161,8 @@ public void usesPreparedTransaction() { .setCreateTime( Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000)) .build())); - when(rpc.beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap())) + when(rpc.beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> ApiFutures.immediateFuture( @@ -180,7 +182,8 @@ public void usesPreparedTransaction() { DatabaseClient client = spanner.getDatabaseClient(db); client.readWriteTransaction().run(transaction -> null); verify(rpc, times(1)) - .beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); } } @@ -294,9 +297,10 @@ public void prepareReadWriteTransaction() { return null; }); verify(rpc, Mockito.never()) - .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); verify(rpc, Mockito.never()) - .beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); + .beginTransactionAsync( + Mockito.any(BeginTransactionRequest.class), Mockito.anyMap(), eq(true)); assertThat(usedInlinedBegin).isTrue(); } @@ -311,7 +315,7 @@ private long[] batchDmlException(int status) { .setRpc(rpc) .build(); when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(transaction); - when(session.beginTransactionAsync()) + when(session.beginTransactionAsync(true)) .thenReturn( ApiFutures.immediateFuture(ByteString.copyFromUtf8(UUID.randomUUID().toString()))); when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 23ec9c682c..d32dc51871 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -19,12 +19,17 @@ import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import com.google.api.gax.core.GaxProperties; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; @@ -42,8 +47,10 @@ import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.spi.v1.GapicSpannerRpc.AdminRequestsLimitExceededRetryAlgorithm; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ListValue; import com.google.rpc.ErrorInfo; import com.google.spanner.v1.ExecuteSqlRequest; @@ -136,6 +143,7 @@ public class GapicSpannerRpcTest { private static Metadata lastSeenHeaders; private static String defaultUserAgent; private static Spanner spanner; + private static boolean isRouteToLeader; @Parameter public Dialect dialect; @@ -173,6 +181,17 @@ public ServerCall.Listener interceptCall( String auth = headers.get(Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER)); assertThat(auth).isEqualTo("Bearer " + VARIABLE_OAUTH_TOKEN); + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getExecuteStreamingSqlMethod()) + || call.getMethodDescriptor().equals(SpannerGrpc.getExecuteSqlMethod())) { + String routeToLeaderHeader = + headers.get( + Key.of( + "x-goog-spanner-route-to-leader", + Metadata.ASCII_STRING_MARSHALLER)); + isRouteToLeader = + (routeToLeaderHeader != null && routeToLeaderHeader.equals("true")); + } return Contexts.interceptCall(Context.current(), call, headers, next); } }) @@ -194,6 +213,7 @@ public void reset() throws InterruptedException { server.shutdown(); server.awaitTermination(); } + isRouteToLeader = false; } @Test @@ -207,15 +227,14 @@ public void testCallCredentialsProviderPreferenceAboveCredentials() { GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); // GoogleAuthLibraryCallCredentials doesn't implement equals, so we can only check for the // existence. - assertThat( - rpc.newCallContext( - optionsMap, - "/some/resource", - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod()) - .getCallOptions() - .getCredentials()) - .isNotNull(); + assertNotNull( + rpc.newCallContext( + optionsMap, + "/some/resource", + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod()) + .getCallOptions() + .getCredentials()); rpc.shutdown(); } @@ -228,15 +247,14 @@ public void testCallCredentialsProviderReturnsNull() { .setCallCredentialsProvider(() -> null) .build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); - assertThat( - rpc.newCallContext( - optionsMap, - "/some/resource", - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod()) - .getCallOptions() - .getCredentials()) - .isNull(); + assertNull( + rpc.newCallContext( + optionsMap, + "/some/resource", + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod()) + .getCallOptions() + .getCredentials()); rpc.shutdown(); } @@ -248,15 +266,14 @@ public void testNoCallCredentials() { .setCredentials(STATIC_CREDENTIALS) .build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); - assertThat( - rpc.newCallContext( - optionsMap, - "/some/resource", - GetSessionRequest.getDefaultInstance(), - SpannerGrpc.getGetSessionMethod()) - .getCallOptions() - .getCredentials()) - .isNull(); + assertNull( + rpc.newCallContext( + optionsMap, + "/some/resource", + GetSessionRequest.getDefaultInstance(), + SpannerGrpc.getGetSessionMethod()) + .getCallOptions() + .getCredentials()); rpc.shutdown(); } @@ -374,7 +391,66 @@ public ApiCallContext configure( public void testNewCallContextWithNullRequestAndNullMethod() { SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build(); GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); - assertThat(rpc.newCallContext(optionsMap, "/some/resource", null, null)).isNotNull(); + assertNotNull(rpc.newCallContext(optionsMap, "/some/resource", null, null)); + rpc.shutdown(); + } + + @Test + public void testNewCallContextWithRouteToLeaderHeader() { + SpannerOptions options = + SpannerOptions.newBuilder().setProjectId("some-project").enableLeaderAwareRouting().build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod(), + true); + assertNotNull(callContext); + assertEquals( + ImmutableList.of("true"), + callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")); + assertEquals( + ImmutableList.of("projects/some-project"), + callContext.getExtraHeaders().get(ApiClientHeaderProvider.getDefaultResourceHeaderKey())); + rpc.shutdown(); + } + + @Test + public void testNewCallContextWithoutRouteToLeaderHeader() { + SpannerOptions options = + SpannerOptions.newBuilder().enableLeaderAwareRouting().setProjectId("some-project").build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod(), + false); + assertNotNull(callContext); + assertNull(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")); + rpc.shutdown(); + } + + @Test + public void testNewCallContextWithRouteToLeaderHeaderAndLarDisabled() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .disableLeaderAwareRouting() + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + GrpcCallContext callContext = + rpc.newCallContext( + optionsMap, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod(), + true); + assertNotNull(callContext); + assertNull(callContext.getExtraHeaders().get("x-goog-spanner-route-to-leader")); rpc.shutdown(); } @@ -449,6 +525,56 @@ public void testCustomUserAgent() { } } + @Test + public void testRouteToLeaderHeaderForReadOnly() { + final SpannerOptions options = + createSpannerOptions().toBuilder().enableLeaderAwareRouting().build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + + try (final ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { + rs.next(); + } + + assertFalse(isRouteToLeader); + } + } + + @Test + public void testRouteToLeaderHeaderForReadWrite() { + final SpannerOptions options = + createSpannerOptions().toBuilder().enableLeaderAwareRouting().build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + TransactionRunner runner = databaseClient.readWriteTransaction(); + runner.run( + transaction -> { + transaction.executeUpdate(UPDATE_FOO_STATEMENT); + return null; + }); + } + assertTrue(isRouteToLeader); + } + + @Test + public void testRouteToLeaderHeaderWithLeaderAwareRoutingDisabled() { + final SpannerOptions options = + createSpannerOptions().toBuilder().disableLeaderAwareRouting().build(); + try (Spanner spanner = options.getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + TransactionRunner runner = databaseClient.readWriteTransaction(); + runner.run( + transaction -> { + transaction.executeUpdate(UPDATE_FOO_STATEMENT); + return null; + }); + } + assertFalse(isRouteToLeader); + } + private SpannerOptions createSpannerOptions() { String endpoint = address.getHostString() + ":" + server.getPort(); return SpannerOptions.newBuilder() diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java index 07e13626c1..cc43e2dc33 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java @@ -15,11 +15,12 @@ */ package com.google.cloud.spanner.spi.v1; -import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import io.grpc.Metadata; import io.grpc.Metadata.Key; import java.util.List; @@ -77,9 +78,20 @@ public void testNewExtraHeaders() { SpannerMetadataProvider metadataProvider = SpannerMetadataProvider.create(ImmutableMap.of(), "header1"); Map> extraHeaders = metadataProvider.newExtraHeaders(null, "value1"); - assertThat(extraHeaders) - .containsExactlyEntriesIn( - ImmutableMap.>of("header1", ImmutableList.of("value1"))); + Map> expectedHeaders = + ImmutableMap.>of("header1", ImmutableList.of("value1")); + assertTrue(Maps.difference(extraHeaders, expectedHeaders).areEqual()); + } + + @Test + public void testNewRouteToLeaderHeader() { + SpannerMetadataProvider metadataProvider = + SpannerMetadataProvider.create(ImmutableMap.of(), "header1"); + Map> extraHeaders = metadataProvider.newRouteToLeaderHeader(); + Map> expectedHeaders = + ImmutableMap.>of( + "x-goog-spanner-route-to-leader", ImmutableList.of("true")); + assertTrue(Maps.difference(extraHeaders, expectedHeaders).areEqual()); } private String getResourceHeaderValue(