Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry on RST_STREAM internal error #2111

Merged
merged 3 commits into from Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.2.0')
implementation platform('com.google.cloud:libraries-bom:26.3.0')

implementation 'com.google.cloud:google-cloud-spanner'
```
Expand Down
Expand Up @@ -29,6 +29,8 @@ public class IsRetryableInternalError implements Predicate<Throwable> {
private static final String EOS_ERROR_MESSAGE =
"Received unexpected EOS on DATA frame from server";

private static final String RST_STREAM_ERROR_MESSAGE = "stream terminated by RST_STREAM";

@Override
public boolean apply(Throwable cause) {
if (isInternalError(cause)) {
Expand All @@ -38,6 +40,8 @@ public boolean apply(Throwable cause) {
return true;
} else if (cause.getMessage().contains(EOS_ERROR_MESSAGE)) {
return true;
} else if (cause.getMessage().contains(RST_STREAM_ERROR_MESSAGE)) {
return true;
}
}
return false;
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InternalException;
Expand Down Expand Up @@ -114,6 +115,18 @@ public void genericInternalStatusRuntimeExceptionIsRetryable() {
assertThat(predicate.apply(e)).isFalse();
}

@Test
public void rstStreamInternalExceptionIsRetryable() {
final InternalException e =
new InternalException(
"INTERNAL: stream terminated by RST_STREAM.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false);

assertTrue(predicate.apply(e));
}

@Test
public void genericInternalExceptionIsNotRetryable() {
final InternalException e =
Expand Down
Expand Up @@ -344,6 +344,45 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class));
}

@Test
public void testExecuteStreamingPartitionedUpdateRSTstream() {
ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build();
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build();
ServerStream<PartialResultSet> stream1 = mock(ServerStream.class);
Iterator<PartialResultSet> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true, true, false);
when(iterator.next())
.thenReturn(p1)
.thenThrow(
new InternalException(
"INTERNAL: stream terminated by RST_STREAM.",
null,
GrpcStatusCode.of(Code.INTERNAL),
true));
when(stream1.iterator()).thenReturn(iterator);
ServerStream<PartialResultSet> stream2 = mock(ServerStream.class);
when(stream2.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator());
when(rpc.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream1);
when(rpc.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap());
verify(rpc)
.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class));
verify(rpc)
.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class));
}

@Test
public void testExecuteStreamingPartitionedUpdateGenericInternalException() {
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
Expand Down