diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 573bc0c055..28f1f033d2 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -61,6 +61,9 @@ class ConnectionWorker implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); + // Maximum wait time on inflight quota before error out. + private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000; + private Lock lock; private Condition hasMessageInWaitingQueue; private Condition inflightReduced; @@ -322,7 +325,14 @@ private ApiFuture appendInternal(AppendRowsRequest message) this.inflightBytes += requestWrapper.messageSize; waitingRequestQueue.addLast(requestWrapper); hasMessageInWaitingQueue.signal(); - maybeWaitForInflightQuota(); + try { + maybeWaitForInflightQuota(); + } catch (StatusRuntimeException ex) { + --this.inflightRequests; + waitingRequestQueue.pollLast(); + this.inflightBytes -= requestWrapper.messageSize; + throw ex; + } return requestWrapper.appendResult; } finally { this.lock.unlock(); @@ -347,6 +357,15 @@ private void maybeWaitForInflightQuota() { .withCause(e) .withDescription("Interrupted while waiting for quota.")); } + long current_wait_time = System.currentTimeMillis() - start_time; + if (current_wait_time > INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI) { + throw new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withDescription( + String.format( + "Interrupted while waiting for quota due to long waiting time %sms", + current_wait_time))); + } } inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); } @@ -373,7 +392,6 @@ public void close() { log.fine("Waiting for append thread to finish. Stream: " + streamName); try { appendThread.join(); - log.info("User close complete. Stream: " + streamName); } catch (InterruptedException e) { // Unexpected. Just swallow the exception with logging. log.warning( @@ -387,6 +405,7 @@ public void close() { } try { + log.fine("Begin shutting down user callback thread pool for stream " + streamName); threadPool.shutdown(); threadPool.awaitTermination(3, TimeUnit.MINUTES); } catch (InterruptedException e) { @@ -396,7 +415,10 @@ public void close() { + streamName + " is interrupted with exception: " + e.toString()); + throw new IllegalStateException( + "Thread pool shutdown is interrupted for stream " + streamName); } + log.info("User close finishes for stream " + streamName); } /* @@ -858,6 +880,11 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) { } } + @VisibleForTesting + static void setMaxInflightQueueWaitTime(long waitTime) { + INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime; + } + @AutoValue abstract static class TableSchemaAndTimestamp { // Shows the timestamp updated schema is reported from response diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 3d3d3f5a7c..540269d734 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -16,6 +16,9 @@ package com.google.cloud.bigquery.storage.v1; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; @@ -28,7 +31,9 @@ import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; +import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -52,6 +57,7 @@ public class ConnectionWorkerTest { @Before public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); + ConnectionWorker.setMaxInflightQueueWaitTime(300000); serviceHelper = new MockServiceHelper( UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); @@ -281,6 +287,63 @@ public void testAppendInSameStream_switchSchema() throws Exception { } } + @Test + public void testAppendButInflightQueueFull() throws Exception { + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + createProtoSchema("foo"), + 6, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); + ConnectionWorker.setMaxInflightQueueWaitTime(500); + ProtoSchema schema1 = createProtoSchema("foo"); + + long appendCount = 6; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 6 requests, since the max queue size is 5 we will stuck at the 6th request. + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + long startTime = System.currentTimeMillis(); + // At the last request we wait more than 500 millisecond for inflight quota. + if (i == 5) { + assertThrows( + StatusRuntimeException.class, + () -> { + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + schema1, + createFooProtoRows(new String[] {String.valueOf(5)}), + 5); + }); + long timeDiff = System.currentTimeMillis() - startTime; + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 5); + assertTrue(timeDiff > 500); + } else { + futures.add( + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + schema1, + createFooProtoRows(new String[] {String.valueOf(i)}), + i)); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); + } + } + + for (int i = 0; i < appendCount - 1; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + } + private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index d271fd99d5..eacfdcb40f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -105,6 +105,7 @@ public StreamWriterTest() throws DescriptorValidationException {} @Before public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); + ConnectionWorker.setMaxInflightQueueWaitTime(300000); serviceHelper = new MockServiceHelper( UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));