From da06a4623e89b5f3caf90f85dd87d8538fc7d312 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 10 Feb 2023 20:56:39 +0000 Subject: [PATCH] fix: refactor only, add StreamWriter to AppendRowsRequestResponse (#1981) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: refactor only, add StreamWriter to AppendRequestResponse, so that we could callback on StreamWriter to manage its close * . * . * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- README.md | 7 ++- .../bigquery/storage/v1/ConnectionWorker.java | 25 +++++++--- .../storage/v1/ConnectionWorkerPool.java | 3 +- .../bigquery/storage/v1/StreamWriter.java | 5 +- .../storage/v1/ConnectionWorkerTest.java | 50 ++++++++++--------- 5 files changed, 49 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 71b696c69d..3159568293 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,19 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.6.0') +implementation platform('com.google.cloud:libraries-bom:26.7.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.30.0' -``` +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.31.0' If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.30.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.31.0" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 32f749c3f1..05390c56aa 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -24,6 +24,7 @@ import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; import io.grpc.Status; @@ -267,16 +268,19 @@ public void run(Throwable finalStatus) { } /** Schedules the writing of rows at given offset. */ - ApiFuture append( - String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) { + ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { + Preconditions.checkNotNull(streamWriter); AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows( - ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build()); + ProtoData.newBuilder() + .setWriterSchema(streamWriter.getProtoSchema()) + .setRows(rows) + .build()); if (offset >= 0) { requestBuilder.setOffset(Int64Value.of(offset)); } - requestBuilder.setWriteStream(streamName); - return appendInternal(requestBuilder.build()); + requestBuilder.setWriteStream(streamWriter.getStreamName()); + return appendInternal(streamWriter, requestBuilder.build()); } Boolean isUserClosed() { @@ -288,8 +292,9 @@ Boolean isUserClosed() { } } - private ApiFuture appendInternal(AppendRowsRequest message) { - AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); + private ApiFuture appendInternal( + StreamWriter streamWriter, AppendRowsRequest message) { + AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { requestWrapper.appendResult.setException( new StatusRuntimeException( @@ -840,10 +845,14 @@ static final class AppendRequestAndResponse { final AppendRowsRequest message; final long messageSize; - AppendRequestAndResponse(AppendRowsRequest message) { + // The writer that issues the call of the request. + final StreamWriter streamWriter; + + AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) { this.appendResult = SettableApiFuture.create(); this.message = message; this.messageSize = message.getProtoRows().getSerializedSize(); + this.streamWriter = streamWriter; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index fa2729aad9..8fcb84165e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -264,8 +264,7 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, } Stopwatch stopwatch = Stopwatch.createStarted(); ApiFuture responseFuture = - connectionWorker.append( - streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset); + connectionWorker.append(streamWriter, rows, offset); return ApiFutures.transform( responseFuture, // Add callback for update schema diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 0a65c656b4..77bad3eb24 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -145,8 +145,7 @@ public enum Kind { public ApiFuture append( StreamWriter streamWriter, ProtoRows protoRows, long offset) { if (getKind() == Kind.CONNECTION_WORKER) { - return connectionWorker() - .append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset); + return connectionWorker().append(streamWriter, protoRows, offset); } else { return connectionWorkerPool().append(streamWriter, protoRows, offset); } @@ -376,7 +375,7 @@ public ApiFuture append(ProtoRows rows) { public ApiFuture append(ProtoRows rows, long offset) { if (userClosed.get()) { AppendRequestAndResponse requestWrapper = - new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build()); + new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build(), this); requestWrapper.appendResult.setException( new Exceptions.StreamWriterClosedException( Status.fromCode(Status.Code.FAILED_PRECONDITION) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 6cc3247279..4edf0f3e9d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -80,6 +80,14 @@ public void testMultiplexedAppendSuccess() throws Exception { testBigQueryWrite.addResponse(createAppendResponse(i)); } List> futures = new ArrayList<>(); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema("foo")) + .build(); + StreamWriter sw2 = + StreamWriter.newBuilder(TEST_STREAM_2, client) + .setWriterSchema(createProtoSchema("complicate")) + .build(); // We do a pattern of: // send to stream1, string1 // send to stream1, string2 @@ -95,8 +103,7 @@ public void testMultiplexedAppendSuccess() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - createProtoSchema("foo"), + sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -105,8 +112,7 @@ public void testMultiplexedAppendSuccess() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_2, - createProtoSchema("complicate"), + sw2, createComplicateTypeProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -197,14 +203,19 @@ public void testAppendInSameStream_switchSchema() throws Exception { // send to stream1, schema3 // send to stream1, schema1 // ... + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + StreamWriter sw2 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema2).build(); + StreamWriter sw3 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema3).build(); for (long i = 0; i < appendCount; i++) { switch ((int) i % 4) { case 0: futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema1, + sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -212,8 +223,7 @@ public void testAppendInSameStream_switchSchema() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema2, + sw2, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -222,8 +232,7 @@ public void testAppendInSameStream_switchSchema() throws Exception { futures.add( sendTestMessage( connectionWorker, - TEST_STREAM_1, - schema3, + sw3, createFooProtoRows(new String[] {String.valueOf(i)}), i)); break; @@ -293,6 +302,9 @@ public void testAppendInSameStream_switchSchema() throws Exception { @Test public void testAppendButInflightQueueFull() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1, @@ -305,7 +317,6 @@ public void testAppendButInflightQueueFull() throws Exception { client.getSettings()); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); - ProtoSchema schema1 = createProtoSchema("foo"); long appendCount = 6; for (int i = 0; i < appendCount; i++) { @@ -322,11 +333,7 @@ public void testAppendButInflightQueueFull() throws Exception { StatusRuntimeException.class, () -> { sendTestMessage( - connectionWorker, - TEST_STREAM_1, - schema1, - createFooProtoRows(new String[] {String.valueOf(5)}), - 5); + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(5)}), 5); }); long timeDiff = System.currentTimeMillis() - startTime; assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 5); @@ -334,11 +341,7 @@ public void testAppendButInflightQueueFull() throws Exception { } else { futures.add( sendTestMessage( - connectionWorker, - TEST_STREAM_1, - schema1, - createFooProtoRows(new String[] {String.valueOf(i)}), - i)); + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); } } @@ -396,11 +399,10 @@ private ProtoSchema createProtoSchema(String protoName) { private ApiFuture sendTestMessage( ConnectionWorker connectionWorker, - String streamName, - ProtoSchema protoSchema, + StreamWriter streamWriter, ProtoRows protoRows, long offset) { - return connectionWorker.append(streamName, protoSchema, protoRows, offset); + return connectionWorker.append(streamWriter, protoRows, offset); } private ProtoRows createFooProtoRows(String[] messages) {