Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use streaming read/query settings for stream retry #2579

Merged
merged 3 commits into from Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -422,4 +422,25 @@
<className>com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall</className>
<method>com.google.api.gax.rpc.ApiCallContext getCallContext()</method>
</difference>
<!-- (Internal change, propagate streaming retry settings) -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.retrying.RetrySettings getReadRetrySettings()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.retrying.RetrySettings getExecuteQueryRetrySettings()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>java.util.Set getReadRetryableCodes()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>java.util.Set getExecuteQueryRetryableCodes()</method>
</difference>
</differences>
Expand Up @@ -664,7 +664,12 @@ ResultSet executeQueryInternalWithOptions(
getExecuteSqlRequestBuilder(
statement, queryMode, options, /* withTransactionSelector = */ false);
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
new ResumableStreamIterator(
MAX_BUFFERED_CHUNKS,
SpannerImpl.QUERY,
span,
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
Expand Down Expand Up @@ -798,7 +803,12 @@ ResultSet readInternalWithOptions(
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.READ, span) {
new ResumableStreamIterator(
MAX_BUFFERED_CHUNKS,
SpannerImpl.READ,
span,
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
Expand Down
Expand Up @@ -24,8 +24,10 @@

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -65,6 +67,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -1082,10 +1085,12 @@ public void onError(SpannerException e) {
@VisibleForTesting
abstract static class ResumableStreamIterator extends AbstractIterator<PartialResultSet>
implements CloseableIterator<PartialResultSet> {
private static final RetrySettings STREAMING_RETRY_SETTINGS =
private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS =
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
private final RetrySettings streamingRetrySettings;
private final Set<Code> retryableCodes;
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
private final BackOff backOff = newBackOff();
private final BackOff backOff;
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final Span span;
Expand All @@ -1099,24 +1104,58 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
*/
private boolean safeToRetry = true;

protected ResumableStreamIterator(int maxBufferSize, String streamName, Span parent) {
protected ResumableStreamIterator(
int maxBufferSize,
String streamName,
Span parent,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
}

private static ExponentialBackOff newBackOff() {
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
}

private ExponentialBackOff newBackOff() {
if (Objects.equals(streamingRetrySettings, DEFAULT_STREAMING_RETRY_SETTINGS)) {
rajatbhatta marked this conversation as resolved.
Show resolved Hide resolved
return new ExponentialBackOff.Builder()
.setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
.setInitialIntervalMillis(
Math.max(10, (int) streamingRetrySettings.getInitialRetryDelay().toMillis()))
.setMaxIntervalMillis(
Math.max(1000, (int) streamingRetrySettings.getMaxRetryDelay().toMillis()))
.setMaxElapsedTimeMillis(
Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
.build();
}
return new ExponentialBackOff.Builder()
.setMultiplier(STREAMING_RETRY_SETTINGS.getRetryDelayMultiplier())
.setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
// All of these values must be > 0.
.setInitialIntervalMillis(
Math.max(10, (int) STREAMING_RETRY_SETTINGS.getInitialRetryDelay().toMillis()))
Math.max(
1,
(int)
Math.min(
streamingRetrySettings.getInitialRetryDelay().toMillis(),
Integer.MAX_VALUE)))
.setMaxIntervalMillis(
Math.max(1000, (int) STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis()))
.setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
Math.max(
1,
(int)
Math.min(
streamingRetrySettings.getMaxRetryDelay().toMillis(), Integer.MAX_VALUE)))
.setMaxElapsedTimeMillis(
Math.max(
1,
(int)
Math.min(
streamingRetrySettings.getTotalTimeout().toMillis(), Integer.MAX_VALUE)))
.build();
}

private static void backoffSleep(Context context, BackOff backoff) throws SpannerException {
private void backoffSleep(Context context, BackOff backoff) throws SpannerException {
backoffSleep(context, nextBackOffMillis(backoff));
}

Expand All @@ -1128,7 +1167,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}
}

private static void backoffSleep(Context context, long backoffMillis) throws SpannerException {
private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
Expand All @@ -1145,7 +1184,7 @@ private static void backoffSleep(Context context, long backoffMillis) throws Spa
try {
if (backoffMillis == BackOff.STOP) {
// Highly unlikely but we handle it just in case.
backoffMillis = STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis();
backoffMillis = streamingRetrySettings.getMaxRetryDelay().toMillis();
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
Expand Down Expand Up @@ -1233,19 +1272,20 @@ protected PartialResultSet computeNext() {
return null;
}
}
} catch (SpannerException e) {
if (safeToRetry && e.isRetryable()) {
} catch (SpannerException spannerException) {
if (safeToRetry && isRetryable(spannerException)) {
span.addAnnotation(
"Stream broken. Safe to retry", TraceUtil.getExceptionAnnotations(e));
logger.log(Level.FINE, "Retryable exception, will sleep and retry", e);
"Stream broken. Safe to retry",
TraceUtil.getExceptionAnnotations(spannerException));
logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
buffer.removeLast();
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
try (Scope s = tracer.withSpan(span)) {
long delay = e.getRetryDelayInMillis();
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
backoffSleep(context, delay);
} else {
Expand All @@ -1256,15 +1296,21 @@ protected PartialResultSet computeNext() {
continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
throw e;
TraceUtil.setWithFailure(span, spannerException);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
throw e;
}
}
}

boolean isRetryable(SpannerException spannerException) {
return spannerException.isRetryable()
|| retryableCodes.contains(
GrpcStatusCode.of(spannerException.getErrorCode().getGrpcStatusCode()).getCode());
}
}

static double valueProtoToFloat64(com.google.protobuf.Value proto) {
Expand Down
Expand Up @@ -45,6 +45,7 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
Expand Down Expand Up @@ -230,6 +231,10 @@ public class GapicSpannerRpc implements SpannerRpc {

private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final RetrySettings executeQueryRetrySettings;
private final Set<Code> executeQueryRetryableCodes;
private final RetrySettings readRetrySettings;
private final Set<Code> readRetryableCodes;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
private final InstanceAdminStub instanceAdminStub;
Expand Down Expand Up @@ -368,6 +373,14 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
this.readRetrySettings =
options.getSpannerStubSettings().streamingReadSettings().getRetrySettings();
this.readRetryableCodes =
options.getSpannerStubSettings().streamingReadSettings().getRetryableCodes();
this.executeQueryRetrySettings =
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetrySettings();
this.executeQueryRetryableCodes =
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetryableCodes();
partitionedDmlRetrySettings =
options
.getSpannerStubSettings()
Expand Down Expand Up @@ -472,6 +485,10 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
this.databaseAdminStub = null;
this.instanceAdminStub = null;
this.spannerStub = null;
this.readRetrySettings = null;
this.readRetryableCodes = null;
this.executeQueryRetrySettings = null;
this.executeQueryRetryableCodes = null;
this.partitionedDmlStub = null;
this.databaseAdminStubSettings = null;
this.spannerWatchdog = null;
Expand Down Expand Up @@ -1585,6 +1602,16 @@ public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Opt
return spannerStub.deleteSessionCallable().futureCall(request, context);
}

@Override
public RetrySettings getReadRetrySettings() {
return readRetrySettings;
}

@Override
public Set<Code> getReadRetryableCodes() {
return readRetryableCodes;
}

@Override
public StreamingCall read(
ReadRequest request,
Expand All @@ -1599,6 +1626,16 @@ public StreamingCall read(
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
public RetrySettings getExecuteQueryRetrySettings() {
return executeQueryRetrySettings;
}

@Override
public Set<Code> getExecuteQueryRetryableCodes() {
return executeQueryRetryableCodes;
}

@Override
public ResultSet executeQuery(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
Expand Down
Expand Up @@ -22,12 +22,14 @@
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.BackupId;
import com.google.cloud.spanner.Restore;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.collect.ImmutableList;
import com.google.iam.v1.GetPolicyOptions;
import com.google.iam.v1.Policy;
Expand All @@ -53,6 +55,7 @@
import com.google.spanner.v1.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -337,6 +340,16 @@ Session createSession(
ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

/** Returns the retry settings for streaming read operations. */
default RetrySettings getReadRetrySettings() {
return SpannerStubSettings.newBuilder().streamingReadSettings().getRetrySettings();
}

/** Returns the retryable codes for streaming read operations. */
default Set<Code> getReadRetryableCodes() {
return SpannerStubSettings.newBuilder().streamingReadSettings().getRetryableCodes();
}

/**
* Performs a streaming read.
*
Expand All @@ -351,6 +364,16 @@ StreamingCall read(
@Nullable Map<Option, ?> options,
boolean routeToLeader);

/** Returns the retry settings for streaming query operations. */
default RetrySettings getExecuteQueryRetrySettings() {
return SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
}

/** Returns the retryable codes for streaming query operations. */
default Set<Code> getExecuteQueryRetryableCodes() {
return SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes();
}

/**
* Executes a query.
*
Expand Down