From 3159b120e5cd388cf9776a1fa928a3e6ae105d9d Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 25 Jan 2023 12:54:18 -0800 Subject: [PATCH] feat: add timeout to inflight queue waiting (#1957) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log Co-authored-by: Owl Bot --- .../bigquery/storage/v1/ConnectionWorker.java | 31 ++++++++- .../storage/v1/ConnectionWorkerTest.java | 63 +++++++++++++++++++ .../bigquery/storage/v1/StreamWriterTest.java | 1 + 3 files changed, 93 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 573bc0c055..28f1f033d2 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -61,6 +61,9 @@ class ConnectionWorker implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); + // Maximum wait time on inflight quota before error out. + private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000; + private Lock lock; private Condition hasMessageInWaitingQueue; private Condition inflightReduced; @@ -322,7 +325,14 @@ private ApiFuture appendInternal(AppendRowsRequest message) this.inflightBytes += requestWrapper.messageSize; waitingRequestQueue.addLast(requestWrapper); hasMessageInWaitingQueue.signal(); - maybeWaitForInflightQuota(); + try { + maybeWaitForInflightQuota(); + } catch (StatusRuntimeException ex) { + --this.inflightRequests; + waitingRequestQueue.pollLast(); + this.inflightBytes -= requestWrapper.messageSize; + throw ex; + } return requestWrapper.appendResult; } finally { this.lock.unlock(); @@ -347,6 +357,15 @@ private void maybeWaitForInflightQuota() { .withCause(e) .withDescription("Interrupted while waiting for quota.")); } + long current_wait_time = System.currentTimeMillis() - start_time; + if (current_wait_time > INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI) { + throw new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withDescription( + String.format( + "Interrupted while waiting for quota due to long waiting time %sms", + current_wait_time))); + } } inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); } @@ -373,7 +392,6 @@ public void close() { log.fine("Waiting for append thread to finish. Stream: " + streamName); try { appendThread.join(); - log.info("User close complete. Stream: " + streamName); } catch (InterruptedException e) { // Unexpected. Just swallow the exception with logging. log.warning( @@ -387,6 +405,7 @@ public void close() { } try { + log.fine("Begin shutting down user callback thread pool for stream " + streamName); threadPool.shutdown(); threadPool.awaitTermination(3, TimeUnit.MINUTES); } catch (InterruptedException e) { @@ -396,7 +415,10 @@ public void close() { + streamName + " is interrupted with exception: " + e.toString()); + throw new IllegalStateException( + "Thread pool shutdown is interrupted for stream " + streamName); } + log.info("User close finishes for stream " + streamName); } /* @@ -858,6 +880,11 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) { } } + @VisibleForTesting + static void setMaxInflightQueueWaitTime(long waitTime) { + INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime; + } + @AutoValue abstract static class TableSchemaAndTimestamp { // Shows the timestamp updated schema is reported from response diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 3d3d3f5a7c..540269d734 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -16,6 +16,9 @@ package com.google.cloud.bigquery.storage.v1; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; @@ -28,7 +31,9 @@ import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; +import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -52,6 +57,7 @@ public class ConnectionWorkerTest { @Before public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); + ConnectionWorker.setMaxInflightQueueWaitTime(300000); serviceHelper = new MockServiceHelper( UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); @@ -281,6 +287,63 @@ public void testAppendInSameStream_switchSchema() throws Exception { } } + @Test + public void testAppendButInflightQueueFull() throws Exception { + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + createProtoSchema("foo"), + 6, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); + ConnectionWorker.setMaxInflightQueueWaitTime(500); + ProtoSchema schema1 = createProtoSchema("foo"); + + long appendCount = 6; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 6 requests, since the max queue size is 5 we will stuck at the 6th request. + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + long startTime = System.currentTimeMillis(); + // At the last request we wait more than 500 millisecond for inflight quota. + if (i == 5) { + assertThrows( + StatusRuntimeException.class, + () -> { + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + schema1, + createFooProtoRows(new String[] {String.valueOf(5)}), + 5); + }); + long timeDiff = System.currentTimeMillis() - startTime; + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 5); + assertTrue(timeDiff > 500); + } else { + futures.add( + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + schema1, + createFooProtoRows(new String[] {String.valueOf(i)}), + i)); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); + } + } + + for (int i = 0; i < appendCount - 1; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + } + private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index d271fd99d5..eacfdcb40f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -105,6 +105,7 @@ public StreamWriterTest() throws DescriptorValidationException {} @Before public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); + ConnectionWorker.setMaxInflightQueueWaitTime(300000); serviceHelper = new MockServiceHelper( UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));