From 1e9a8cac19c3748515ebff7990d02fd576c7dd23 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Fri, 31 Mar 2023 13:34:34 -0700 Subject: [PATCH] feat: add public api to stream writer to set the maximum wait time (#2066) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add public api to stream writer to set the maximum wait time * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify back the readme change from owl post processor * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- README.md | 6 ++-- .../bigquery/storage/v1/ConnectionWorker.java | 2 +- .../bigquery/storage/v1/StreamWriter.java | 10 ++++++ .../bigquery/storage/v1/StreamWriterTest.java | 31 +++++++++++++++++++ 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 145a863499..5eaba81994 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.34.1' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.34.2' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.34.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.34.2" ``` @@ -219,7 +219,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.34.1 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.34.2 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 1aeb911943..12afbf13e0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -75,7 +75,7 @@ class ConnectionWorker implements AutoCloseable { * We will constantly checking how much time we have been waiting for the next request callback * if we wait too much time we will start shutting down the connections and clean up the queues. */ - private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15); + static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15); private Lock lock; private Condition hasMessageInWaitingQueue; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index b21a52a63d..bfa30c6141 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -518,6 +518,16 @@ public synchronized TableSchema getUpdatedSchema() { : null; } + /** + * Sets the maximum time a request is allowed to be waiting in request waiting queue. Under very + * low chance, it's possible for append request to be waiting indefintely for request callback + * when Google networking SDK does not detect the networking breakage. The default timeout is 15 + * minutes. We are investigating the root cause for callback not triggered by networking SDK. + */ + public static void setMaxRequestCallbackWaitTime(Duration waitTime) { + ConnectionWorker.MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime; + } + long getCreationTimestamp() { return creationTimestamp; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index af36273102..bc6dd71690 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigquery.storage.v1; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -113,6 +114,7 @@ public StreamWriterTest() throws DescriptorValidationException {} @Before public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); + StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(10000)); ConnectionWorker.setMaxInflightQueueWaitTime(300000); serviceHelper = new MockServiceHelper( @@ -947,6 +949,35 @@ public void testMessageTooLarge() throws Exception { writer.close(); } + @Test + public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(1)); + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); + + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + + for (int i = 0; i < appendCount; i++) { + int finalI = i; + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> futures.get(finalI).get().getAppendResult().getOffset().getValue()); + assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + } + } + @Test public void testAppendWithResetSuccess() throws Exception { try (StreamWriter writer = getTestStreamWriter()) {