Skip to content

Commit

Permalink
fix: apply stream wait timeout (#2544)
Browse files Browse the repository at this point in the history
* fix: apply stream wait timeout

Use the streamWaitTimeout that has been set on the call context when polling
from the gRPC stream. This prevents the stream from blocking forever if for
some reason the stream is no longer delivering data, and also no error is
propagated to the client.

The default stream wait timeout that is set for all call contexts is 30 mins.
This value can be overridden by configuring a custom call context for a specific
query.

Fixes #2494

* test: add a wait time to the mock server to ensure that a timeout occurs

* chore: add clirr ignore

* docs: add test + comment for zero timeout

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
olavloite and gcf-owl-bot[bot] committed Jul 31, 2023
1 parent e456e7b commit 5a12cd2
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 33 deletions.
7 changes: 7 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -359,4 +359,11 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isDelayTransactionStartUntilFirstWrite()</method>
</difference>

<!-- (Internal change, use stream timeout) -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall</className>
<method>com.google.api.gax.rpc.ApiCallContext getCallContext()</method>
</difference>
</differences>
Expand Up @@ -25,6 +25,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
Expand Down Expand Up @@ -944,6 +946,8 @@ static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>

private SpannerRpc.StreamingCall call;
private volatile boolean withBeginTransaction;
private TimeUnit streamWaitTimeoutUnit;
private long streamWaitTimeoutValue;
private SpannerException error;

@VisibleForTesting
Expand All @@ -965,6 +969,22 @@ protected final SpannerRpc.ResultStreamConsumer consumer() {
public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) {
this.call = call;
this.withBeginTransaction = withBeginTransaction;
ApiCallContext callContext = call.getCallContext();
Duration streamWaitTimeout = callContext == null ? null : callContext.getStreamWaitTimeout();
if (streamWaitTimeout != null) {
// Determine the timeout unit to use. This reduces the precision to seconds if the timeout
// value is more than 1 second, which is lower than the precision that would normally be
// used by the stream watchdog (which uses a precision of 10 seconds by default).
if (streamWaitTimeout.getSeconds() > 0L) {
streamWaitTimeoutValue = streamWaitTimeout.getSeconds();
streamWaitTimeoutUnit = TimeUnit.SECONDS;
} else if (streamWaitTimeout.getNano() > 0) {
streamWaitTimeoutValue = streamWaitTimeout.getNano();
streamWaitTimeoutUnit = TimeUnit.NANOSECONDS;
}
// Note that if the stream-wait-timeout is zero, we won't set a timeout at all.
// That is consistent with ApiCallContext#withStreamWaitTimeout(Duration.ZERO).
}
}

@Override
Expand All @@ -983,11 +1003,15 @@ public boolean isWithBeginTransaction() {
protected final PartialResultSet computeNext() {
PartialResultSet next;
try {
// TODO: Ideally honor io.grpc.Context while blocking here. In practice,
// cancellation/deadline results in an error being delivered to "stream", which
// should mean that we do not block significantly longer afterwards, but it would
// be more robust to use poll() with a timeout.
next = stream.take();
if (streamWaitTimeoutUnit != null) {
next = stream.poll(streamWaitTimeoutValue, streamWaitTimeoutUnit);
if (next == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "stream wait timeout");
}
} else {
next = stream.take();
}
} catch (InterruptedException e) {
// Treat interrupt as a request to cancel the read.
throw SpannerExceptionFactory.propagateInterrupt(e);
Expand Down
Expand Up @@ -1596,20 +1596,7 @@ public StreamingCall read(
options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader);
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.streamingReadCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
public void request(int numMessage) {
controller.request(numMessage);
}

// TODO(hzyi): streamController currently does not support cancel with message. Add
// this in gax and update this method later
@Override
public void cancel(String message) {
controller.cancel();
}
};
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
Expand Down Expand Up @@ -1673,22 +1660,10 @@ public StreamingCall executeQuery(
request,
SpannerGrpc.getExecuteStreamingSqlMethod(),
routeToLeader);

SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
public void request(int numMessage) {
controller.request(numMessage);
}

// TODO(hzyi): streamController currently does not support cancel with message. Add
// this in gax and update this method later
@Override
public void cancel(String message) {
controller.cancel();
}
};
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
Expand Down Expand Up @@ -1957,6 +1932,31 @@ public boolean isClosed() {
return rpcIsClosed;
}

private static final class GrpcStreamingCall implements StreamingCall {
private final ApiCallContext callContext;
private final StreamController controller;

GrpcStreamingCall(ApiCallContext callContext, StreamController controller) {
this.callContext = callContext;
this.controller = controller;
}

@Override
public ApiCallContext getCallContext() {
return callContext;
}

@Override
public void request(int numMessages) {
controller.request(numMessages);
}

@Override
public void cancel(@Nullable String message) {
controller.cancel();
}
}

/**
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
* the {@link ResultStreamConsumer}.
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.BackupId;
Expand Down Expand Up @@ -150,6 +151,9 @@ interface ResultStreamConsumer {
/** Handle for cancellation of a streaming read or query call. */
interface StreamingCall {

/** Returns the {@link ApiCallContext} that is used for this streaming call. */
ApiCallContext getCallContext();

/**
* Requests more messages from the stream. We disable the auto flow control mechanism in grpc,
* so we need to request messages ourself. This gives us more control over how much buffer we
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.NoCredentials;
import com.google.cloud.Timestamp;
Expand All @@ -51,6 +52,7 @@
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
Expand All @@ -77,6 +79,7 @@
import com.google.spanner.v1.TypeAnnotationCode;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -2963,6 +2966,63 @@ public void testStatementWithBytesArrayParameter() {
}
}

@Test
public void testStreamWaitTimeout() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
// Add a wait time to the mock server. Note that the test won't actually wait 100ms, as it uses
// a 1ns time out.
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
// Create a custom call configuration that uses a 1 nanosecond stream timeout value. This will
// always time out, as a call to the mock server will always take more than 1 nanosecond.
CallContextConfigurator configurator =
new CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
return context.withStreamWaitTimeout(Duration.ofNanos(1L));
}
};
Context context =
Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator);
context.run(
() -> {
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
assertTrue(
exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
}
});
}

@Test
public void testZeroStreamWaitTimeout() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
// Create a custom call configuration that sets the stream timeout to zero.
// This should disable the timeout.
CallContextConfigurator configurator =
new CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
return context.withStreamWaitTimeout(Duration.ZERO);
}
};
Context context =
Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator);
context.run(
() -> {
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
// A zero timeout should not cause a timeout, and instead be ignored.
assertTrue(resultSet.next());
assertFalse(resultSet.next());
}
});
}

static void assertAsString(String expected, ResultSet resultSet, int col) {
assertEquals(expected, resultSet.getValue(col).getAsString());
assertEquals(ImmutableList.of(expected), resultSet.getValue(col).getAsStringList());
Expand Down
Expand Up @@ -22,6 +22,8 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -50,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

/** Unit tests for {@link com.google.cloud.spanner.AbstractResultSet.GrpcResultSet}. */
@RunWith(JUnit4.class)
Expand All @@ -58,6 +61,7 @@ public class GrpcResultSetTest {
private AbstractResultSet.GrpcResultSet resultSet;
private SpannerRpc.ResultStreamConsumer consumer;
private AbstractResultSet.GrpcStreamIterator stream;
private final Duration streamWaitTimeout = Duration.ofNanos(1L);

private static class NoOpListener implements AbstractResultSet.Listener {
@Override
Expand All @@ -78,6 +82,11 @@ public void setUp() {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault().withStreamWaitTimeout(streamWaitTimeout);
}

@Override
public void cancel(@Nullable String message) {}

Expand All @@ -93,6 +102,14 @@ public AbstractResultSet.GrpcResultSet resultSetWithMode(QueryMode queryMode) {
return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
}

@Test
public void testStreamTimeout() {
// We don't add any results to the stream. That means that it will time out after 1ns.
SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
assertTrue(exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
}

@Test
public void metadata() {
Type rowType = Type.struct(Type.StructField.of("f", Type.string()));
Expand Down
Expand Up @@ -18,6 +18,8 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -115,6 +117,11 @@ private void run() throws Exception {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault();
}

@Override
public void cancel(@Nullable String message) {}

Expand Down
Expand Up @@ -27,7 +27,9 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.NanoClock;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand Down Expand Up @@ -407,6 +409,11 @@ public void singleUseReadOnlyTransactionReturnsEmptyTransactionMetadata() {
}

private static class NoOpStreamingCall implements SpannerRpc.StreamingCall {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault();
}

@Override
public void cancel(@Nullable String message) {}

Expand Down

0 comments on commit 5a12cd2

Please sign in to comment.