diff --git a/README.md b/README.md index b208a29cdd..712bb3034e 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.4.0') +implementation platform('com.google.cloud:libraries-bom:26.5.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index c55b8a691c..9833dbb1f3 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -115,7 +115,7 @@ 7009 com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool - ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean) + ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings) 7009 diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 28f1f033d2..b3b2c19199 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -222,7 +222,6 @@ public ConnectionWorker( Status.fromCode(Code.INVALID_ARGUMENT) .withDescription("Writer schema must be provided when building this writer.")); } - this.writerSchema = writerSchema; this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.limitExceededBehavior = limitExceededBehavior; @@ -432,7 +431,7 @@ private void appendLoop() { // Indicate whether we are at the first request after switching destination. // True means the schema and other metadata are needed. - boolean firstRequestForDestinationSwitch = true; + boolean firstRequestForTableOrSchemaSwitch = true; // Represent whether we have entered multiplexing. boolean isMultiplexing = false; @@ -483,25 +482,35 @@ private void appendLoop() { resetConnection(); // Set firstRequestInConnection to indicate the next request to be sent should include // metedata. Reset everytime after reconnection. - firstRequestForDestinationSwitch = true; + firstRequestForTableOrSchemaSwitch = true; } while (!localQueue.isEmpty()) { AppendRowsRequest originalRequest = localQueue.pollFirst().message; AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder(); - - // Consider we enter multiplexing if we met a different non empty stream name. - if (!originalRequest.getWriteStream().isEmpty() - && !streamName.isEmpty() - && !originalRequest.getWriteStream().equals(streamName)) { + // Always respect the first writer schema seen by the loop. + if (writerSchema == null) { + writerSchema = originalRequest.getProtoRows().getWriterSchema(); + } + // Consider we enter multiplexing if we met a different non empty stream name or we meet + // a new schema for the same stream name. + // For the schema comparision we don't use message differencer to speed up the comparing + // process. `equals(...)` can bring us false positive, e.g. two repeated field can be + // considered the same but is not considered equals(). However as long as it's never provide + // false negative we will always correctly pass writer schema to backend. + if ((!originalRequest.getWriteStream().isEmpty() + && !streamName.isEmpty() + && !originalRequest.getWriteStream().equals(streamName)) + || (originalRequest.getProtoRows().hasWriterSchema() + && !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) { streamName = originalRequest.getWriteStream(); + writerSchema = originalRequest.getProtoRows().getWriterSchema(); isMultiplexing = true; - firstRequestForDestinationSwitch = true; + firstRequestForTableOrSchemaSwitch = true; } - if (firstRequestForDestinationSwitch) { + if (firstRequestForTableOrSchemaSwitch) { // If we are at the first request for every table switch, including the first request in // the connection, we will attach both stream name and table schema to the request. - // We don't support change of schema change during multiplexing for the saeme stream name. destinationSet.add(streamName); if (this.traceId != null) { originalRequestBuilder.setTraceId(this.traceId); @@ -511,17 +520,11 @@ private void appendLoop() { originalRequestBuilder.clearWriteStream(); } - // We don't use message differencer to speed up the comparing process. - // `equals(...)` can bring us false positive, e.g. two repeated field can be considered the - // same but is not considered equals(). However as long as it's never provide false negative - // we will always correctly pass writer schema to backend. - if (firstRequestForDestinationSwitch - || !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) { - writerSchema = originalRequest.getProtoRows().getWriterSchema(); - } else { + // During non table/schema switch requests, clear writer schema. + if (!firstRequestForTableOrSchemaSwitch) { originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema(); } - firstRequestForDestinationSwitch = false; + firstRequestForTableOrSchemaSwitch = false; // Send should only throw an exception if there is a problem with the request. The catch // block will handle this case, and return the exception with the result. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 540269d734..6cc3247279 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -247,10 +247,10 @@ public void testAppendInSameStream_switchSchema() throws Exception { // We will get the request as the pattern of: // (writer_stream: t1, schema: schema1) // (writer_stream: _, schema: _) - // (writer_stream: _, schema: schema3) - // (writer_stream: _, schema: _) - // (writer_stream: _, schema: schema1) - // (writer_stream: _, schema: _) + // (writer_stream: t1, schema: schema3) + // (writer_stream: t1, schema: _) + // (writer_stream: t1, schema: schema1) + // (writer_stream: t1, schema: _) switch (i % 4) { case 0: if (i == 0) { @@ -261,19 +261,23 @@ public void testAppendInSameStream_switchSchema() throws Exception { .isEqualTo("foo"); break; case 1: - assertThat(serverRequest.getWriteStream()).isEmpty(); + if (i == 1) { + assertThat(serverRequest.getWriteStream()).isEmpty(); + } else { + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); + } // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); break; case 2: - assertThat(serverRequest.getWriteStream()).isEmpty(); + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); // Schema is populated after table switch. assertThat( serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) .isEqualTo("bar"); break; case 3: - assertThat(serverRequest.getWriteStream()).isEmpty(); + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); break;