diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index ac445e1e35..c0ddc050ae 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -422,4 +422,25 @@ com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall com.google.api.gax.rpc.ApiCallContext getCallContext() + + + 7012 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.api.gax.retrying.RetrySettings getReadRetrySettings() + + + 7012 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.api.gax.retrying.RetrySettings getExecuteQueryRetrySettings() + + + 7012 + com/google/cloud/spanner/spi/v1/SpannerRpc + java.util.Set getReadRetryableCodes() + + + 7012 + com/google/cloud/spanner/spi/v1/SpannerRpc + java.util.Set getExecuteQueryRetryableCodes() + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index e19ace944a..d5b1abe0b5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -664,7 +664,12 @@ ResultSet executeQueryInternalWithOptions( getExecuteSqlRequestBuilder( statement, queryMode, options, /* withTransactionSelector = */ false); ResumableStreamIterator stream = - new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) { + new ResumableStreamIterator( + MAX_BUFFERED_CHUNKS, + SpannerImpl.QUERY, + span, + rpc.getExecuteQueryRetrySettings(), + rpc.getExecuteQueryRetryableCodes()) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks); @@ -798,7 +803,12 @@ ResultSet readInternalWithOptions( final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = - new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.READ, span) { + new ResumableStreamIterator( + MAX_BUFFERED_CHUNKS, + SpannerImpl.READ, + span, + rpc.getReadRetrySettings(), + rpc.getReadRetryableCodes()) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 37024bd267..b0d5ab2bba 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -24,8 +24,10 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; +import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.ByteArray; import com.google.cloud.Date; import com.google.cloud.Timestamp; @@ -65,6 +67,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -1082,10 +1085,12 @@ public void onError(SpannerException e) { @VisibleForTesting abstract static class ResumableStreamIterator extends AbstractIterator implements CloseableIterator { - private static final RetrySettings STREAMING_RETRY_SETTINGS = + private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS = SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(); + private final RetrySettings streamingRetrySettings; + private final Set retryableCodes; private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName()); - private final BackOff backOff = newBackOff(); + private final BackOff backOff; private final LinkedList buffer = new LinkedList<>(); private final int maxBufferSize; private final Span span; @@ -1099,24 +1104,58 @@ abstract static class ResumableStreamIterator extends AbstractIterator retryableCodes) { checkArgument(maxBufferSize >= 0); this.maxBufferSize = maxBufferSize; this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan(); - } - - private static ExponentialBackOff newBackOff() { + this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings); + this.retryableCodes = Preconditions.checkNotNull(retryableCodes); + this.backOff = newBackOff(); + } + + private ExponentialBackOff newBackOff() { + if (Objects.equals(streamingRetrySettings, DEFAULT_STREAMING_RETRY_SETTINGS)) { + return new ExponentialBackOff.Builder() + .setMultiplier(streamingRetrySettings.getRetryDelayMultiplier()) + .setInitialIntervalMillis( + Math.max(10, (int) streamingRetrySettings.getInitialRetryDelay().toMillis())) + .setMaxIntervalMillis( + Math.max(1000, (int) streamingRetrySettings.getMaxRetryDelay().toMillis())) + .setMaxElapsedTimeMillis( + Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned. + .build(); + } return new ExponentialBackOff.Builder() - .setMultiplier(STREAMING_RETRY_SETTINGS.getRetryDelayMultiplier()) + .setMultiplier(streamingRetrySettings.getRetryDelayMultiplier()) + // All of these values must be > 0. .setInitialIntervalMillis( - Math.max(10, (int) STREAMING_RETRY_SETTINGS.getInitialRetryDelay().toMillis())) + Math.max( + 1, + (int) + Math.min( + streamingRetrySettings.getInitialRetryDelay().toMillis(), + Integer.MAX_VALUE))) .setMaxIntervalMillis( - Math.max(1000, (int) STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis())) - .setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned. + Math.max( + 1, + (int) + Math.min( + streamingRetrySettings.getMaxRetryDelay().toMillis(), Integer.MAX_VALUE))) + .setMaxElapsedTimeMillis( + Math.max( + 1, + (int) + Math.min( + streamingRetrySettings.getTotalTimeout().toMillis(), Integer.MAX_VALUE))) .build(); } - private static void backoffSleep(Context context, BackOff backoff) throws SpannerException { + private void backoffSleep(Context context, BackOff backoff) throws SpannerException { backoffSleep(context, nextBackOffMillis(backoff)); } @@ -1128,7 +1167,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException { } } - private static void backoffSleep(Context context, long backoffMillis) throws SpannerException { + private void backoffSleep(Context context, long backoffMillis) throws SpannerException { tracer .getCurrentSpan() .addAnnotation( @@ -1145,7 +1184,7 @@ private static void backoffSleep(Context context, long backoffMillis) throws Spa try { if (backoffMillis == BackOff.STOP) { // Highly unlikely but we handle it just in case. - backoffMillis = STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis(); + backoffMillis = streamingRetrySettings.getMaxRetryDelay().toMillis(); } if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) { // Woken by context cancellation. @@ -1233,11 +1272,12 @@ protected PartialResultSet computeNext() { return null; } } - } catch (SpannerException e) { - if (safeToRetry && e.isRetryable()) { + } catch (SpannerException spannerException) { + if (safeToRetry && isRetryable(spannerException)) { span.addAnnotation( - "Stream broken. Safe to retry", TraceUtil.getExceptionAnnotations(e)); - logger.log(Level.FINE, "Retryable exception, will sleep and retry", e); + "Stream broken. Safe to retry", + TraceUtil.getExceptionAnnotations(spannerException)); + logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException); // Truncate any items in the buffer before the last retry token. while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) { buffer.removeLast(); @@ -1245,7 +1285,7 @@ protected PartialResultSet computeNext() { assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken); stream = null; try (Scope s = tracer.withSpan(span)) { - long delay = e.getRetryDelayInMillis(); + long delay = spannerException.getRetryDelayInMillis(); if (delay != -1) { backoffSleep(context, delay); } else { @@ -1256,8 +1296,8 @@ protected PartialResultSet computeNext() { continue; } span.addAnnotation("Stream broken. Not safe to retry"); - TraceUtil.setWithFailure(span, e); - throw e; + TraceUtil.setWithFailure(span, spannerException); + throw spannerException; } catch (RuntimeException e) { span.addAnnotation("Stream broken. Not safe to retry"); TraceUtil.setWithFailure(span, e); @@ -1265,6 +1305,12 @@ protected PartialResultSet computeNext() { } } } + + boolean isRetryable(SpannerException spannerException) { + return spannerException.isRetryable() + || retryableCodes.contains( + GrpcStatusCode.of(spannerException.getErrorCode().getGrpcStatusCode()).getCode()); + } } static double valueProtoToFloat64(com.google.protobuf.Value proto) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 1afea7676d..3e6fc5fbcb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -45,6 +45,7 @@ import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; @@ -230,6 +231,10 @@ public class GapicSpannerRpc implements SpannerRpc { private boolean rpcIsClosed; private final SpannerStub spannerStub; + private final RetrySettings executeQueryRetrySettings; + private final Set executeQueryRetryableCodes; + private final RetrySettings readRetrySettings; + private final Set readRetryableCodes; private final SpannerStub partitionedDmlStub; private final RetrySettings partitionedDmlRetrySettings; private final InstanceAdminStub instanceAdminStub; @@ -368,6 +373,14 @@ public GapicSpannerRpc(final SpannerOptions options) { .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) .build()); + this.readRetrySettings = + options.getSpannerStubSettings().streamingReadSettings().getRetrySettings(); + this.readRetryableCodes = + options.getSpannerStubSettings().streamingReadSettings().getRetryableCodes(); + this.executeQueryRetrySettings = + options.getSpannerStubSettings().executeStreamingSqlSettings().getRetrySettings(); + this.executeQueryRetryableCodes = + options.getSpannerStubSettings().executeStreamingSqlSettings().getRetryableCodes(); partitionedDmlRetrySettings = options .getSpannerStubSettings() @@ -472,6 +485,10 @@ public UnaryCallable createUnaryCalla this.databaseAdminStub = null; this.instanceAdminStub = null; this.spannerStub = null; + this.readRetrySettings = null; + this.readRetryableCodes = null; + this.executeQueryRetrySettings = null; + this.executeQueryRetryableCodes = null; this.partitionedDmlStub = null; this.databaseAdminStubSettings = null; this.spannerWatchdog = null; @@ -1585,6 +1602,16 @@ public ApiFuture asyncDeleteSession(String sessionName, @Nullable Map getReadRetryableCodes() { + return readRetryableCodes; + } + @Override public StreamingCall read( ReadRequest request, @@ -1599,6 +1626,16 @@ public StreamingCall read( return new GrpcStreamingCall(context, responseObserver.getController()); } + @Override + public RetrySettings getExecuteQueryRetrySettings() { + return executeQueryRetrySettings; + } + + @Override + public Set getExecuteQueryRetryableCodes() { + return executeQueryRetryableCodes; + } + @Override public ResultSet executeQuery( ExecuteSqlRequest request, @Nullable Map options, boolean routeToLeader) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 62c34a58a1..53b4b97764 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -22,12 +22,14 @@ import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.ServiceRpc; import com.google.cloud.spanner.BackupId; import com.google.cloud.spanner.Restore; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.collect.ImmutableList; import com.google.iam.v1.GetPolicyOptions; import com.google.iam.v1.Policy; @@ -53,6 +55,7 @@ import com.google.spanner.v1.*; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -337,6 +340,16 @@ Session createSession( ApiFuture asyncDeleteSession(String sessionName, @Nullable Map options) throws SpannerException; + /** Returns the retry settings for streaming read operations. */ + default RetrySettings getReadRetrySettings() { + return SpannerStubSettings.newBuilder().streamingReadSettings().getRetrySettings(); + } + + /** Returns the retryable codes for streaming read operations. */ + default Set getReadRetryableCodes() { + return SpannerStubSettings.newBuilder().streamingReadSettings().getRetryableCodes(); + } + /** * Performs a streaming read. * @@ -351,6 +364,16 @@ StreamingCall read( @Nullable Map options, boolean routeToLeader); + /** Returns the retry settings for streaming query operations. */ + default RetrySettings getExecuteQueryRetrySettings() { + return SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(); + } + + /** Returns the retryable codes for streaming query operations. */ + default Set getExecuteQueryRetryableCodes() { + return SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes(); + } + /** * Executes a query. * diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 99e40daa53..250864f0ba 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -40,6 +40,7 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.StatusCode; import com.google.cloud.ByteArray; import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; @@ -66,8 +67,10 @@ import com.google.common.io.BaseEncoding; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.ByteString; import com.google.protobuf.ListValue; import com.google.protobuf.NullValue; +import com.google.rpc.RetryInfo; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DeleteSessionRequest; import com.google.spanner.v1.ExecuteBatchDmlRequest; @@ -84,11 +87,13 @@ import com.google.spanner.v1.TypeAnnotationCode; import com.google.spanner.v1.TypeCode; import io.grpc.Context; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.protobuf.lite.ProtoLiteUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -3328,6 +3333,92 @@ public ApiCallContext configure( }); } + @Test + public void testRetryOnResourceExhausted() { + final RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofSeconds(60L)) + .setMaxRpcTimeout(Duration.ofSeconds(60L)) + .setTotalTimeout(Duration.ofSeconds(60L)) + .setRpcTimeoutMultiplier(1.0d) + .setInitialRetryDelay(Duration.ZERO) + .setMaxRetryDelay(Duration.ZERO) + .setMaxAttempts(100) + .build(); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()); + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay( + com.google.protobuf.Duration.newBuilder() + .setNanos((int) Duration.ofMillis(1).toNanos()) + .build()) + .build(); + Metadata.Key key = + Metadata.Key.of( + retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX, + ProtoLiteUtils.metadataMarshaller(retryInfo)); + Metadata trailers = new Metadata(); + trailers.put(key, retryInfo); + builder + .getSpannerStubSettingsBuilder() + .executeStreamingSqlSettings() + .setRetryableCodes(StatusCode.Code.UNAVAILABLE, StatusCode.Code.RESOURCE_EXHAUSTED) + .setRetrySettings(retrySettings); + + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + final int expectedRowCount = 5; + RandomResultSetGenerator generator = new RandomResultSetGenerator(expectedRowCount); + Statement statement = Statement.of("select * from random_table"); + mockSpanner.putStatementResult(StatementResult.query(statement, generator.generate())); + + for (int errorIndex = 0; errorIndex < expectedRowCount - 1; errorIndex++) { + for (boolean withRetryInfo : new boolean[] {false, true}) { + // RESOURCE_EXHAUSTED errors with and without retry-info should be retried. + StatusRuntimeException exception = + Status.RESOURCE_EXHAUSTED.asRuntimeException(withRetryInfo ? trailers : null); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofStreamException(exception, errorIndex)); + try (ResultSet resultSet = client.singleUse().executeQuery(statement)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) {} + } + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + if (errorIndex == 0) { + // We should only have two requests without a resume token, as the error occurred before + // any resume token could be returned. + assertEquals( + 2, + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).stream() + .filter(request -> request.getResumeToken().isEmpty()) + .count()); + } else { + final int expectedResumeToken = errorIndex; + // Check that we have one request with a resume token that corresponds with the place in + // the stream where the error happened. + assertEquals( + 1, + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).stream() + .filter( + request -> + request + .getResumeToken() + .equals( + ByteString.copyFromUtf8( + String.format("%09d", expectedResumeToken)))) + .count()); + } + mockSpanner.clearRequests(); + } + } + } + } + static void assertAsString(String expected, ResultSet resultSet, int col) { assertEquals(expected, resultSet.getValue(col).getAsString()); assertEquals(ImmutableList.of(expected), resultSet.getValue(col).getAsStringList()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index b3e94433c1..06c1725d76 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -24,6 +24,7 @@ import com.google.api.client.util.BackOff; import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; @@ -136,7 +137,12 @@ public void setUp() { private void initWithLimit(int maxBufferSize) { resumableStreamIterator = - new AbstractResultSet.ResumableStreamIterator(maxBufferSize, "", null) { + new AbstractResultSet.ResumableStreamIterator( + maxBufferSize, + "", + null, + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(), + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) { @Override AbstractResultSet.CloseableIterator startStream( @Nullable ByteString resumeToken) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index aa3fe53046..a920fdb67d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -34,6 +34,7 @@ import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.ListValue; @@ -115,6 +116,16 @@ public void setUp() { .thenReturn(ApiFutures.immediateFuture(commitResponse)); Mockito.when(rpc.rollbackAsync(Mockito.any(RollbackRequest.class), Mockito.anyMap())) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); + when(rpc.getReadRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().streamingReadSettings().getRetrySettings()); + when(rpc.getReadRetryableCodes()) + .thenReturn(SpannerStubSettings.newBuilder().streamingReadSettings().getRetryableCodes()); + when(rpc.getExecuteQueryRetrySettings()) + .thenReturn( + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings()); + when(rpc.getExecuteQueryRetryableCodes()) + .thenReturn( + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()); session = spanner.getSessionClient(db).createSession(); ((SessionImpl) session).setCurrentSpan(mock(Span.class)); // We expect the same options, "options", on all calls on "session". diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index bc4757f11d..b20d5dd652 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -58,6 +58,7 @@ import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.ResultStreamConsumer; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -1131,6 +1132,16 @@ public void testSessionNotFoundReadWriteTransaction() { .thenReturn(ApiFutures.immediateFailedFuture(sessionNotFound)); when(rpc.rollbackAsync(any(RollbackRequest.class), any(Map.class))) .thenReturn(ApiFutures.immediateFailedFuture(sessionNotFound)); + when(rpc.getReadRetrySettings()) + .thenReturn(SpannerStubSettings.newBuilder().streamingReadSettings().getRetrySettings()); + when(rpc.getReadRetryableCodes()) + .thenReturn(SpannerStubSettings.newBuilder().streamingReadSettings().getRetryableCodes()); + when(rpc.getExecuteQueryRetrySettings()) + .thenReturn( + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings()); + when(rpc.getExecuteQueryRetryableCodes()) + .thenReturn( + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()); final SessionImpl closedSession = mock(SessionImpl.class); when(closedSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-closed");