diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index 9833dbb1f3..b0d2b7c898 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -142,4 +142,9 @@ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool long getInflightWaitSeconds(com.google.cloud.bigquery.storage.v1.StreamWriter) + + 7009 + com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool + ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings) + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 0060ad7314..8ca9304fe1 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -19,7 +19,6 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; import com.google.auto.value.AutoValue; -import com.google.cloud.bigquery.storage.util.Errors; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; @@ -724,14 +723,14 @@ private void requestCallback(AppendRowsResponse response) { }); } - private boolean isRetriableError(Throwable t) { + private boolean isConnectionErrorRetriable(Throwable t) { Status status = Status.fromThrowable(t); - if (Errors.isRetryableInternalStatus(status)) { - return true; - } return status.getCode() == Code.ABORTED || status.getCode() == Code.UNAVAILABLE - || status.getCode() == Code.CANCELLED; + || status.getCode() == Code.CANCELLED + || status.getCode() == Code.INTERNAL + || status.getCode() == Code.FAILED_PRECONDITION + || status.getCode() == Code.DEADLINE_EXCEEDED; } private void doneCallback(Throwable finalStatus) { @@ -748,7 +747,7 @@ private void doneCallback(Throwable finalStatus) { connectionRetryStartTime = System.currentTimeMillis(); } // If the error can be retried, don't set it here, let it try to retry later on. - if (isRetriableError(finalStatus) + if (isConnectionErrorRetriable(finalStatus) && !userClosed && (maxRetryDuration.toMillis() == 0f || System.currentTimeMillis() - connectionRetryStartTime diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 6426bc6ca1..ce7f233af6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -462,9 +462,17 @@ public void testShortenStreamNameAllowed() throws Exception { @Test public void testAppendSuccessAndConnectionError() throws Exception { - StreamWriter writer = getTestStreamWriter(); + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + // Retry expire immediately. + .setMaxRetryDuration(java.time.Duration.ofMillis(1L)) + .build(); testBigQueryWrite.addResponse(createAppendResponse(0)); testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INTERNAL.asException()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -582,11 +590,11 @@ public void testAppendAfterUserClose() throws Exception { @Test public void testAppendAfterServerClose() throws Exception { StreamWriter writer = getTestStreamWriter(); - testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INVALID_ARGUMENT.asException()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiException error1 = assertFutureException(ApiException.class, appendFuture1); - assertEquals(Code.INTERNAL, error1.getStatusCode().getCode()); + assertEquals(Code.INVALID_ARGUMENT, error1.getStatusCode().getCode()); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); assertTrue(appendFuture2.isDone()); @@ -638,7 +646,7 @@ public void serverCloseWhileRequestsInflight() throws Exception { StreamWriter writer = getTestStreamWriter(); // Server will sleep 2 seconds before closing the connection. testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); - testBigQueryWrite.addException(Status.INTERNAL.asException()); + testBigQueryWrite.addException(Status.INVALID_ARGUMENT.asException()); // Send 10 requests, so that there are 10 inflight requests. int appendCount = 10; @@ -650,7 +658,7 @@ public void serverCloseWhileRequestsInflight() throws Exception { // Server close should properly handle all inflight requests. for (int i = 0; i < appendCount; i++) { ApiException actualError = assertFutureException(ApiException.class, futures.get(i)); - assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode()); + assertEquals(Code.INVALID_ARGUMENT, actualError.getStatusCode().getCode()); } writer.close();