Skip to content

Commit

Permalink
fix: retry on RST_STREAM internal error (#2111)
Browse files Browse the repository at this point in the history
* fix: retry on RST_STREAM internal error

* Update google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java

Co-authored-by: Knut Olav Løite <koloite@gmail.com>

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Knut Olav Løite <koloite@gmail.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Jan 10, 2023
1 parent a40bda9 commit d5372e6
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.2.0')
implementation platform('com.google.cloud:libraries-bom:26.3.0')
implementation 'com.google.cloud:google-cloud-spanner'
```
Expand Down
Expand Up @@ -29,6 +29,8 @@ public class IsRetryableInternalError implements Predicate<Throwable> {
private static final String EOS_ERROR_MESSAGE =
"Received unexpected EOS on DATA frame from server";

private static final String RST_STREAM_ERROR_MESSAGE = "stream terminated by RST_STREAM";

@Override
public boolean apply(Throwable cause) {
if (isInternalError(cause)) {
Expand All @@ -38,6 +40,8 @@ public boolean apply(Throwable cause) {
return true;
} else if (cause.getMessage().contains(EOS_ERROR_MESSAGE)) {
return true;
} else if (cause.getMessage().contains(RST_STREAM_ERROR_MESSAGE)) {
return true;
}
}
return false;
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InternalException;
Expand Down Expand Up @@ -114,6 +115,18 @@ public void genericInternalStatusRuntimeExceptionIsRetryable() {
assertThat(predicate.apply(e)).isFalse();
}

@Test
public void rstStreamInternalExceptionIsRetryable() {
final InternalException e =
new InternalException(
"INTERNAL: stream terminated by RST_STREAM.",
null,
GrpcStatusCode.of(Code.INTERNAL),
false);

assertTrue(predicate.apply(e));
}

@Test
public void genericInternalExceptionIsNotRetryable() {
final InternalException e =
Expand Down
Expand Up @@ -344,6 +344,45 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() {
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class));
}

@Test
public void testExecuteStreamingPartitionedUpdateRSTstream() {
ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build();
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build();
ServerStream<PartialResultSet> stream1 = mock(ServerStream.class);
Iterator<PartialResultSet> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true, true, false);
when(iterator.next())
.thenReturn(p1)
.thenThrow(
new InternalException(
"INTERNAL: stream terminated by RST_STREAM.",
null,
GrpcStatusCode.of(Code.INTERNAL),
true));
when(stream1.iterator()).thenReturn(iterator);
ServerStream<PartialResultSet> stream2 = mock(ServerStream.class);
when(stream2.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator());
when(rpc.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream1);
when(rpc.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)))
.thenReturn(stream2);

PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker);
long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10));

assertThat(count).isEqualTo(1000L);
verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap());
verify(rpc)
.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class));
verify(rpc)
.executeStreamingPartitionedDml(
Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class));
}

@Test
public void testExecuteStreamingPartitionedUpdateGenericInternalException() {
PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();
Expand Down

0 comments on commit d5372e6

Please sign in to comment.