From 91da88b0ed914bf55111dd9cef2a3fc4b27c3443 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Tue, 28 Feb 2023 16:14:27 -0800 Subject: [PATCH] fix: add client shutdown if request waiting in request queue for too long. (#2017) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log * feat: allow java client lib handle switch table schema for the same stream name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: close before retry connection * fix: close before retry connection * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add client side timeout if inflight request wait too long --------- Co-authored-by: Owl Bot --- .../bigquery/storage/v1/ConnectionWorker.java | 51 ++++++++- .../storage/v1/ConnectionWorkerTest.java | 103 ++++++++++++++++++ 2 files changed, 150 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 835d411714..c3ea6f82d7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -33,6 +33,7 @@ import io.grpc.StatusRuntimeException; import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; @@ -66,6 +67,14 @@ class ConnectionWorker implements AutoCloseable { // Maximum wait time on inflight quota before error out. private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000; + /* + * Maximum time waiting for request callback before shutting down the connection. + * + * We will constantly checking how much time we have been waiting for the next request callback + * if we wait too much time we will start shutting down the connections and clean up the queues. + */ + private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15); + private Lock lock; private Condition hasMessageInWaitingQueue; private Condition inflightReduced; @@ -273,7 +282,6 @@ public void run() { log.warning( "Exception thrown from append loop, thus stream writer is shutdown due to exception: " + e.toString()); - e.printStackTrace(); lock.lock(); try { connectionFinalStatus = e; @@ -507,7 +515,7 @@ public void close() { } finally { this.lock.unlock(); } - log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId); + log.info("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId); try { appendThread.join(); } catch (InterruptedException e) { @@ -525,6 +533,7 @@ public void close() { // Backend request has a 2 minute timeout, so wait a little longer than that. this.client.awaitTermination(150, TimeUnit.SECONDS); } catch (InterruptedException ignored) { + log.warning("Client await termination timeout in writer id " + writerId); } try { @@ -569,6 +578,11 @@ private void appendLoop() { this.lock.lock(); try { hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); + // Check whether we should error out the current append loop. + if (inflightRequestQueue.size() > 0) { + throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp); + } + // Copy the streamConnectionIsConnected guarded by lock to a local variable. // In addition, only reconnect if there is a retriable error. streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null; @@ -583,6 +597,7 @@ private void appendLoop() { } while (!this.waitingRequestQueue.isEmpty()) { AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + requestWrapper.trySetRequestInsertQueueTime(); this.inflightRequestQueue.addLast(requestWrapper); localQueue.addLast(requestWrapper); } @@ -703,6 +718,17 @@ private void appendLoop() { log.info("Append thread is done. Stream: " + streamName + " id: " + writerId); } + private void throwIfWaitCallbackTooLong(Instant timeToCheck) { + Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now()); + if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) { + throw new RuntimeException( + String.format( + "Request has waited in inflight queue for %sms for writer %s, " + + "which is over maximum wait time %s", + milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME.toString())); + } + } + /* * Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue. * @@ -740,6 +766,7 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) { } this.lock.lock(); try { + log.warning("Donecallback is not triggered within timeout frame for writer " + writerId); if (connectionFinalStatus == null) { connectionFinalStatus = new StatusRuntimeException( @@ -883,7 +910,7 @@ private boolean isConnectionErrorRetriable(Throwable t) { } private void doneCallback(Throwable finalStatus) { - log.fine( + log.info( "Received done callback. Stream: " + streamName + " worker id: " @@ -923,7 +950,9 @@ private void doneCallback(Throwable finalStatus) { "Connection finished with error " + finalStatus.toString() + " for stream " - + streamName); + + streamName + + " with write id: " + + writerId); } } } finally { @@ -955,12 +984,21 @@ static final class AppendRequestAndResponse { // The writer that issues the call of the request. final StreamWriter streamWriter; + Instant requestCreationTimeStamp; + AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) { this.appendResult = SettableApiFuture.create(); this.message = message; this.messageSize = message.getProtoRows().getSerializedSize(); this.streamWriter = streamWriter; } + + void trySetRequestInsertQueueTime() { + // Only set the first time the caller tries to set the timestamp. + if (requestCreationTimeStamp == null) { + requestCreationTimeStamp = Instant.now(); + } + } } /** Returns the current workload of this worker. */ @@ -1051,6 +1089,11 @@ static void setMaxInflightQueueWaitTime(long waitTime) { INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime; } + @VisibleForTesting + static void setMaxInflightRequestWaitTime(Duration waitTime) { + MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime; + } + @AutoValue abstract static class TableSchemaAndTimestamp { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 13711bddd0..cb4e05ab20 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -61,6 +61,7 @@ public class ConnectionWorkerTest { public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); ConnectionWorker.setMaxInflightQueueWaitTime(300000); + ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10)); serviceHelper = new MockServiceHelper( UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); @@ -607,4 +608,106 @@ public void testLoadIsOverWhelmed() { Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100); assertThat(load2.isOverwhelmed()).isFalse(); } + + @Test + public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1)); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); + + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); + } + + for (int i = 0; i < appendCount; i++) { + int finalI = i; + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> futures.get(finalI).get().getAppendResult().getOffset().getValue()); + assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + } + + // The future append will directly fail. + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(100)}), + 100) + .get()); + assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + } + + @Test + public void testLongTimeIdleWontFail() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1)); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + + long appendCount = 10; + for (int i = 0; i < appendCount * 2; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); + } + // Sleep 2 seconds to make sure request queue is empty. + Thread.sleep(2000); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 0); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(i)}), + i + appendCount)); + } + for (int i = 0; i < appendCount * 2; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + } }