From d5372e662624831abc694d81acecf797d32d86e3 Mon Sep 17 00:00:00 2001 From: rahul2393 Date: Tue, 10 Jan 2023 13:23:28 +0530 Subject: [PATCH] fix: retry on RST_STREAM internal error (#2111) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: retry on RST_STREAM internal error * Update google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java Co-authored-by: Knut Olav Løite * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Knut Olav Løite Co-authored-by: Owl Bot --- README.md | 2 +- .../spanner/IsRetryableInternalError.java | 4 ++ .../spanner/IsRetryableInternalErrorTest.java | 13 +++++++ .../PartitionedDmlTransactionTest.java | 39 +++++++++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index baac29f697..38486eb057 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.2.0') +implementation platform('com.google.cloud:libraries-bom:26.3.0') implementation 'com.google.cloud:google-cloud-spanner' ``` diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java index c62b4a2e06..d250c0ad6c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IsRetryableInternalError.java @@ -29,6 +29,8 @@ public class IsRetryableInternalError implements Predicate { private static final String EOS_ERROR_MESSAGE = "Received unexpected EOS on DATA frame from server"; + private static final String RST_STREAM_ERROR_MESSAGE = "stream terminated by RST_STREAM"; + @Override public boolean apply(Throwable cause) { if (isInternalError(cause)) { @@ -38,6 +40,8 @@ public boolean apply(Throwable cause) { return true; } else if (cause.getMessage().contains(EOS_ERROR_MESSAGE)) { return true; + } else if (cause.getMessage().contains(RST_STREAM_ERROR_MESSAGE)) { + return true; } } return false; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java index e1c360da74..63039fcd23 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IsRetryableInternalErrorTest.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.InternalException; @@ -114,6 +115,18 @@ public void genericInternalStatusRuntimeExceptionIsRetryable() { assertThat(predicate.apply(e)).isFalse(); } + @Test + public void rstStreamInternalExceptionIsRetryable() { + final InternalException e = + new InternalException( + "INTERNAL: stream terminated by RST_STREAM.", + null, + GrpcStatusCode.of(Code.INTERNAL), + false); + + assertTrue(predicate.apply(e)); + } + @Test public void genericInternalExceptionIsNotRetryable() { final InternalException e = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 61dcef2902..de1ec8fa39 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -344,6 +344,45 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)); } + @Test + public void testExecuteStreamingPartitionedUpdateRSTstream() { + ResultSetStats stats = ResultSetStats.newBuilder().setRowCountLowerBound(1000L).build(); + PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build(); + PartialResultSet p2 = PartialResultSet.newBuilder().setStats(stats).build(); + ServerStream stream1 = mock(ServerStream.class); + Iterator iterator = mock(Iterator.class); + when(iterator.hasNext()).thenReturn(true, true, false); + when(iterator.next()) + .thenReturn(p1) + .thenThrow( + new InternalException( + "INTERNAL: stream terminated by RST_STREAM.", + null, + GrpcStatusCode.of(Code.INTERNAL), + true)); + when(stream1.iterator()).thenReturn(iterator); + ServerStream stream2 = mock(ServerStream.class); + when(stream2.iterator()).thenReturn(ImmutableList.of(p1, p2).iterator()); + when(rpc.executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) + .thenReturn(stream1); + when(rpc.executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) + .thenReturn(stream2); + + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); + long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); + + assertThat(count).isEqualTo(1000L); + verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); + verify(rpc) + .executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class)); + verify(rpc) + .executeStreamingPartitionedDml( + Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class)); + } + @Test public void testExecuteStreamingPartitionedUpdateGenericInternalException() { PartialResultSet p1 = PartialResultSet.newBuilder().setResumeToken(resumeToken).build();