From 9c01bc11b51dc1e3e209e4d6b666b9ddd3212cf5 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 31 Jan 2023 22:12:07 +0000 Subject: [PATCH] fix: at connection level, retry for internal errors (#1965) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: at connection level, retry for internal errors * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * . * . * . * . * . * feat: allow java client to handle schema change during same stream name (#1964) * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log * feat: allow java client lib handle switch table schema for the same stream name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot * fix: remove unrecoverable connection from connection pool during multiplexing (#1967) * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log * feat: allow java client lib handle switch table schema for the same stream name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --------- Co-authored-by: Owl Bot Co-authored-by: Gaole Meng --- .../clirr-ignored-differences.xml | 5 +++++ .../bigquery/storage/v1/ConnectionWorker.java | 13 ++++++------- .../bigquery/storage/v1/StreamWriterTest.java | 18 +++++++++++++----- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index 9833dbb1f3..b0d2b7c898 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -142,4 +142,9 @@ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool long getInflightWaitSeconds(com.google.cloud.bigquery.storage.v1.StreamWriter) + + 7009 + com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool + ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings) + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 0060ad7314..8ca9304fe1 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -19,7 +19,6 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; import com.google.auto.value.AutoValue; -import com.google.cloud.bigquery.storage.util.Errors; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; @@ -724,14 +723,14 @@ private void requestCallback(AppendRowsResponse response) { }); } - private boolean isRetriableError(Throwable t) { + private boolean isConnectionErrorRetriable(Throwable t) { Status status = Status.fromThrowable(t); - if (Errors.isRetryableInternalStatus(status)) { - return true; - } return status.getCode() == Code.ABORTED || status.getCode() == Code.UNAVAILABLE - || status.getCode() == Code.CANCELLED; + || status.getCode() == Code.CANCELLED + || status.getCode() == Code.INTERNAL + || status.getCode() == Code.FAILED_PRECONDITION + || status.getCode() == Code.DEADLINE_EXCEEDED; } private void doneCallback(Throwable finalStatus) { @@ -748,7 +747,7 @@ private void doneCallback(Throwable finalStatus) { connectionRetryStartTime = System.currentTimeMillis(); } // If the error can be retried, don't set it here, let it try to retry later on. - if (isRetriableError(finalStatus) + if (isConnectionErrorRetriable(finalStatus) && !userClosed && (maxRetryDuration.toMillis() == 0f || System.currentTimeMillis() - connectionRetryStartTime diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 6426bc6ca1..ce7f233af6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -462,9 +462,17 @@ public void testShortenStreamNameAllowed() throws Exception { @Test public void testAppendSuccessAndConnectionError() throws Exception { - StreamWriter writer = getTestStreamWriter(); + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + // Retry expire immediately. + .setMaxRetryDuration(java.time.Duration.ofMillis(1L)) + .build(); testBigQueryWrite.addResponse(createAppendResponse(0)); testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INTERNAL.asException()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -582,11 +590,11 @@ public void testAppendAfterUserClose() throws Exception { @Test public void testAppendAfterServerClose() throws Exception { StreamWriter writer = getTestStreamWriter(); - testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INVALID_ARGUMENT.asException()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiException error1 = assertFutureException(ApiException.class, appendFuture1); - assertEquals(Code.INTERNAL, error1.getStatusCode().getCode()); + assertEquals(Code.INVALID_ARGUMENT, error1.getStatusCode().getCode()); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); assertTrue(appendFuture2.isDone()); @@ -638,7 +646,7 @@ public void serverCloseWhileRequestsInflight() throws Exception { StreamWriter writer = getTestStreamWriter(); // Server will sleep 2 seconds before closing the connection. testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); - testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INVALID_ARGUMENT.asException()); // Send 10 requests, so that there are 10 inflight requests. int appendCount = 10; @@ -650,7 +658,7 @@ public void serverCloseWhileRequestsInflight() throws Exception { // Server close should properly handle all inflight requests. for (int i = 0; i < appendCount; i++) { ApiException actualError = assertFutureException(ApiException.class, futures.get(i)); - assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode()); + assertEquals(Code.INVALID_ARGUMENT, actualError.getStatusCode().getCode()); } writer.close();