From 305f71ee4b274df58388fc3000e9f5da9fc908e1 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Tue, 31 Jan 2023 11:27:42 -0800 Subject: [PATCH] feat: allow java client to handle schema change during same stream name (#1964) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log * feat: allow java client lib handle switch table schema for the same stream name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- README.md | 6 +-- .../clirr-ignored-differences.xml | 2 +- .../bigquery/storage/v1/ConnectionWorker.java | 43 ++++++++++--------- .../storage/v1/ConnectionWorkerTest.java | 18 +++++--- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index b208a29cdd..712bb3034e 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.4.0') +implementation platform('com.google.cloud:libraries-bom:26.5.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index c55b8a691c..9833dbb1f3 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -115,7 +115,7 @@ 7009 com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool - ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean) + ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings) 7009 diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 28f1f033d2..b3b2c19199 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -222,7 +222,6 @@ public ConnectionWorker( Status.fromCode(Code.INVALID_ARGUMENT) .withDescription("Writer schema must be provided when building this writer.")); } - this.writerSchema = writerSchema; this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.limitExceededBehavior = limitExceededBehavior; @@ -432,7 +431,7 @@ private void appendLoop() { // Indicate whether we are at the first request after switching destination. // True means the schema and other metadata are needed. - boolean firstRequestForDestinationSwitch = true; + boolean firstRequestForTableOrSchemaSwitch = true; // Represent whether we have entered multiplexing. boolean isMultiplexing = false; @@ -483,25 +482,35 @@ private void appendLoop() { resetConnection(); // Set firstRequestInConnection to indicate the next request to be sent should include // metedata. Reset everytime after reconnection. - firstRequestForDestinationSwitch = true; + firstRequestForTableOrSchemaSwitch = true; } while (!localQueue.isEmpty()) { AppendRowsRequest originalRequest = localQueue.pollFirst().message; AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder(); - - // Consider we enter multiplexing if we met a different non empty stream name. - if (!originalRequest.getWriteStream().isEmpty() - && !streamName.isEmpty() - && !originalRequest.getWriteStream().equals(streamName)) { + // Always respect the first writer schema seen by the loop. + if (writerSchema == null) { + writerSchema = originalRequest.getProtoRows().getWriterSchema(); + } + // Consider we enter multiplexing if we met a different non empty stream name or we meet + // a new schema for the same stream name. + // For the schema comparision we don't use message differencer to speed up the comparing + // process. `equals(...)` can bring us false positive, e.g. two repeated field can be + // considered the same but is not considered equals(). However as long as it's never provide + // false negative we will always correctly pass writer schema to backend. + if ((!originalRequest.getWriteStream().isEmpty() + && !streamName.isEmpty() + && !originalRequest.getWriteStream().equals(streamName)) + || (originalRequest.getProtoRows().hasWriterSchema() + && !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) { streamName = originalRequest.getWriteStream(); + writerSchema = originalRequest.getProtoRows().getWriterSchema(); isMultiplexing = true; - firstRequestForDestinationSwitch = true; + firstRequestForTableOrSchemaSwitch = true; } - if (firstRequestForDestinationSwitch) { + if (firstRequestForTableOrSchemaSwitch) { // If we are at the first request for every table switch, including the first request in // the connection, we will attach both stream name and table schema to the request. - // We don't support change of schema change during multiplexing for the saeme stream name. destinationSet.add(streamName); if (this.traceId != null) { originalRequestBuilder.setTraceId(this.traceId); @@ -511,17 +520,11 @@ private void appendLoop() { originalRequestBuilder.clearWriteStream(); } - // We don't use message differencer to speed up the comparing process. - // `equals(...)` can bring us false positive, e.g. two repeated field can be considered the - // same but is not considered equals(). However as long as it's never provide false negative - // we will always correctly pass writer schema to backend. - if (firstRequestForDestinationSwitch - || !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) { - writerSchema = originalRequest.getProtoRows().getWriterSchema(); - } else { + // During non table/schema switch requests, clear writer schema. + if (!firstRequestForTableOrSchemaSwitch) { originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema(); } - firstRequestForDestinationSwitch = false; + firstRequestForTableOrSchemaSwitch = false; // Send should only throw an exception if there is a problem with the request. The catch // block will handle this case, and return the exception with the result. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 540269d734..6cc3247279 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -247,10 +247,10 @@ public void testAppendInSameStream_switchSchema() throws Exception { // We will get the request as the pattern of: // (writer_stream: t1, schema: schema1) // (writer_stream: _, schema: _) - // (writer_stream: _, schema: schema3) - // (writer_stream: _, schema: _) - // (writer_stream: _, schema: schema1) - // (writer_stream: _, schema: _) + // (writer_stream: t1, schema: schema3) + // (writer_stream: t1, schema: _) + // (writer_stream: t1, schema: schema1) + // (writer_stream: t1, schema: _) switch (i % 4) { case 0: if (i == 0) { @@ -261,19 +261,23 @@ public void testAppendInSameStream_switchSchema() throws Exception { .isEqualTo("foo"); break; case 1: - assertThat(serverRequest.getWriteStream()).isEmpty(); + if (i == 1) { + assertThat(serverRequest.getWriteStream()).isEmpty(); + } else { + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); + } // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); break; case 2: - assertThat(serverRequest.getWriteStream()).isEmpty(); + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); // Schema is populated after table switch. assertThat( serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) .isEqualTo("bar"); break; case 3: - assertThat(serverRequest.getWriteStream()).isEmpty(); + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); break;