Skip to content

Commit

Permalink
fix: at connection level, retry for internal errors (#1965)
Browse files Browse the repository at this point in the history
* fix: at connection level, retry for internal errors

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* .

* .

* .

* .

* .

* feat: allow java client to handle schema change during same stream name  (#1964)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add timeout to inflight queue waiting, and also add some extra log

* feat: allow java client lib handle switch table schema for the same stream
name

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* fix: remove unrecoverable connection from connection pool during multiplexing  (#1967)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add timeout to inflight queue waiting, and also add some extra log

* feat: allow java client lib handle switch table schema for the same stream
name

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Gaole Meng <gaole@google.com>
  • Loading branch information
3 people committed Jan 31, 2023
1 parent 091dddb commit 9c01bc1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -142,4 +142,9 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>long getInflightWaitSeconds(com.google.cloud.bigquery.storage.v1.StreamWriter)</method>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</method>
</difference>
</differences>
Expand Up @@ -19,7 +19,6 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
Expand Down Expand Up @@ -724,14 +723,14 @@ private void requestCallback(AppendRowsResponse response) {
});
}

private boolean isRetriableError(Throwable t) {
private boolean isConnectionErrorRetriable(Throwable t) {
Status status = Status.fromThrowable(t);
if (Errors.isRetryableInternalStatus(status)) {
return true;
}
return status.getCode() == Code.ABORTED
|| status.getCode() == Code.UNAVAILABLE
|| status.getCode() == Code.CANCELLED;
|| status.getCode() == Code.CANCELLED
|| status.getCode() == Code.INTERNAL
|| status.getCode() == Code.FAILED_PRECONDITION
|| status.getCode() == Code.DEADLINE_EXCEEDED;
}

private void doneCallback(Throwable finalStatus) {
Expand All @@ -748,7 +747,7 @@ private void doneCallback(Throwable finalStatus) {
connectionRetryStartTime = System.currentTimeMillis();
}
// If the error can be retried, don't set it here, let it try to retry later on.
if (isRetriableError(finalStatus)
if (isConnectionErrorRetriable(finalStatus)
&& !userClosed
&& (maxRetryDuration.toMillis() == 0f
|| System.currentTimeMillis() - connectionRetryStartTime
Expand Down
Expand Up @@ -462,9 +462,17 @@ public void testShortenStreamNameAllowed() throws Exception {

@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriter writer = getTestStreamWriter();
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
// Retry expire immediately.
.setMaxRetryDuration(java.time.Duration.ofMillis(1L))
.build();
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addException(Status.INTERNAL.asException());
testBigQueryWrite.addException(Status.INTERNAL.asException());
testBigQueryWrite.addException(Status.INTERNAL.asException());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
Expand Down Expand Up @@ -582,11 +590,11 @@ public void testAppendAfterUserClose() throws Exception {
@Test
public void testAppendAfterServerClose() throws Exception {
StreamWriter writer = getTestStreamWriter();
testBigQueryWrite.addException(Status.INTERNAL.asException());
testBigQueryWrite.addException(Status.INVALID_ARGUMENT.asException());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiException error1 = assertFutureException(ApiException.class, appendFuture1);
assertEquals(Code.INTERNAL, error1.getStatusCode().getCode());
assertEquals(Code.INVALID_ARGUMENT, error1.getStatusCode().getCode());

ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
assertTrue(appendFuture2.isDone());
Expand Down Expand Up @@ -638,7 +646,7 @@ public void serverCloseWhileRequestsInflight() throws Exception {
StreamWriter writer = getTestStreamWriter();
// Server will sleep 2 seconds before closing the connection.
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2));
testBigQueryWrite.addException(Status.INTERNAL.asException());
testBigQueryWrite.addException(Status.INVALID_ARGUMENT.asException());

// Send 10 requests, so that there are 10 inflight requests.
int appendCount = 10;
Expand All @@ -650,7 +658,7 @@ public void serverCloseWhileRequestsInflight() throws Exception {
// Server close should properly handle all inflight requests.
for (int i = 0; i < appendCount; i++) {
ApiException actualError = assertFutureException(ApiException.class, futures.get(i));
assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode());
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatusCode().getCode());
}

writer.close();
Expand Down

0 comments on commit 9c01bc1

Please sign in to comment.