diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 1f0e1e1989..82f2439904 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -53,6 +53,7 @@ import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; /** @@ -119,6 +120,10 @@ class ConnectionWorker implements AutoCloseable { */ private final String traceId; + /* + * Enables compression on the wire. + */ + private String compressorName = null; /* * Tracks current inflight requests in the stream. */ @@ -253,6 +258,7 @@ public ConnectionWorker( Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, String traceId, + @Nullable String compressorName, BigQueryWriteSettings clientSettings) throws IOException { this.lock = new ReentrantLock(); @@ -274,6 +280,7 @@ public ConnectionWorker( this.traceId = traceId; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); + this.compressorName = compressorName; // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); @@ -343,7 +350,8 @@ public void run(AppendRowsResponse response) { public void run(Throwable finalStatus) { doneCallback(finalStatus); } - }); + }, + this.compressorName); log.info("Finish connecting stream: " + streamName + " id: " + writerId); } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 83be8ce52a..1530d48afc 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -41,6 +41,7 @@ import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; /** Pool of connections to accept appends and distirbute to different connections. */ @@ -91,6 +92,10 @@ public class ConnectionWorkerPool { * TraceId for debugging purpose. */ private final String traceId; + /* + * Sets the compression to use for the calls + */ + private String compressorName; /** Used for test on the number of times createWorker is called. */ private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0); @@ -199,12 +204,14 @@ public abstract static class Builder { java.time.Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, String traceId, + @Nullable String comperssorName, BigQueryWriteSettings clientSettings) { this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.maxRetryDuration = maxRetryDuration; this.limitExceededBehavior = limitExceededBehavior; this.traceId = traceId; + this.compressorName = comperssorName; this.clientSettings = clientSettings; this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); } @@ -379,6 +386,7 @@ private ConnectionWorker createConnectionWorker( maxRetryDuration, limitExceededBehavior, traceId, + compressorName, clientSettings); connectionWorkerPool.add(connectionWorker); log.info( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 79d2582a89..29b0362ef5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -336,6 +336,17 @@ public Builder setLocation(String location) { return this; } + /** + * Sets the compression to use for the calls. The compressor must be of type gzip. + * + * @param compressorName + * @return Builder + */ + public Builder setCompressorName(String compressorName) { + this.schemaAwareStreamWriterBuilder.setCompressorName(compressorName); + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index f91309be35..db617d2013 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -62,6 +62,7 @@ public class SchemaAwareStreamWriter implements AutoCloseable { private Descriptor descriptor; private TableSchema tableSchema; private ProtoSchema protoSchema; + private String CompressorName; // During some sitaution we want to skip stream writer refresh for updated schema. e.g. when // the user provides the table schema, we should always use that schema. @@ -92,7 +93,8 @@ private SchemaAwareStreamWriter(Builder builder) builder.endpoint, builder.flowControlSettings, builder.traceIdBase, - builder.traceId); + builder.traceId, + builder.compressorName); streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); streamWriterBuilder.setLocation(builder.location); this.streamWriter = streamWriterBuilder.build(); @@ -276,7 +278,8 @@ private void setStreamWriterSettings( @Nullable String endpoint, @Nullable FlowControlSettings flowControlSettings, @Nullable String traceIdBase, - @Nullable String traceId) { + @Nullable String traceId, + @Nullable String compressorName) { if (channelProvider != null) { streamWriterBuilder.setChannelProvider(channelProvider); } @@ -316,6 +319,9 @@ private void setStreamWriterSettings( flowControlSettings.getLimitExceededBehavior()); } } + if (compressorName != null) { + streamWriterBuilder.setCompressorName(compressorName); + } } /** @@ -425,6 +431,7 @@ public static final class Builder { // Indicates whether multiplexing mode is enabled. private boolean enableConnectionPool = false; private String location; + private String compressorName; private static final String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; @@ -609,6 +616,17 @@ public Builder setLocation(String location) { return this; } + /** + * Sets the compression to use for the calls. The compressor must be of type gzip. + * + * @param compressorName + * @return Builder + */ + public Builder setCompressorName(String compressorName) { + this.compressorName = compressorName; + return this; + } + /** * Builds SchemaAwareStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamConnection.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamConnection.java index 006f9f3e8a..1039b0f177 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamConnection.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamConnection.java @@ -15,13 +15,18 @@ */ package com.google.cloud.bigquery.storage.v1; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ClientStream; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; +import io.grpc.CallOptions; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; +import java.util.logging.Logger; +import javax.annotation.Nullable; /** * StreamConnection is responsible for writing requests to a GRPC bidirecional connection. @@ -43,11 +48,24 @@ class StreamConnection { private RequestCallback requestCallback; private DoneCallback doneCallback; + private static final Logger log = Logger.getLogger(StreamConnection.class.getName()); + public StreamConnection( - BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) { + BigQueryWriteClient client, + RequestCallback requestCallback, + DoneCallback doneCallback, + @Nullable String compressorName) { this.requestCallback = requestCallback; this.doneCallback = doneCallback; + ApiCallContext apiCallContext = null; + if (compressorName != null) { + apiCallContext = + GrpcCallContext.createDefault() + .withCallOptions(CallOptions.DEFAULT.withCompression(compressorName)); + log.info("gRPC compression is enabled with " + compressorName + " compression"); + } + bidiStreamingCallable = client.appendRowsCallable(); clientStream = bidiStreamingCallable.splitCall( @@ -75,7 +93,8 @@ public void onComplete() { Status.fromCode(Code.CANCELLED) .withDescription("Stream is closed by user."))); } - }); + }, + apiCallContext); } /** diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index d6f0f99ca9..2e395df519 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -215,6 +215,7 @@ private StreamWriter(Builder builder) throws IOException { builder.maxRetryDuration, builder.limitExceededBehavior, builder.traceId, + builder.compressorName, clientSettings)); } else { if (!isDefaultStream(streamName)) { @@ -276,6 +277,7 @@ private StreamWriter(Builder builder) throws IOException { builder.maxRetryDuration, builder.limitExceededBehavior, builder.traceId, + builder.compressorName, client.getSettings()); })); validateFetchedConnectonPool(builder); @@ -598,6 +600,8 @@ public static final class Builder { private java.time.Duration maxRetryDuration = Duration.ofMinutes(5); + private String compressorName = null; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -716,6 +720,16 @@ public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) { return this; } + public Builder setCompressorName(String compressorName) { + Preconditions.checkNotNull(compressorName); + Preconditions.checkArgument( + compressorName.equals("gzip"), + "Compression of type \"%s\" isn't supported, only \"gzip\" compression is supported.", + compressorName); + this.compressorName = compressorName; + return this; + } + /** Builds the {@code StreamWriterV2}. */ public StreamWriter build() throws IOException { return new StreamWriter(this); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index e558d567c8..0724f33546 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -477,6 +477,7 @@ ConnectionWorkerPool createConnectionWorkerPool( maxRetryDuration, FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, clientSettings); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 049e884aee..e548c2b1b8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -333,6 +333,7 @@ public void testAppendButInflightQueueFull() throws Exception { Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, client.getSettings()); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -388,6 +389,7 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, client.getSettings()); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -451,6 +453,7 @@ public void testLocationMismatch() throws Exception { Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, client.getSettings()); StatusRuntimeException ex = assertThrows( @@ -481,6 +484,7 @@ public void testStreamNameMismatch() throws Exception { Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, client.getSettings()); StatusRuntimeException ex = assertThrows( @@ -532,6 +536,7 @@ private ConnectionWorker createConnectionWorker( maxRetryDuration, FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, client.getSettings()); } @@ -625,6 +630,7 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, client.getSettings()); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); @@ -681,6 +687,7 @@ public void testLongTimeIdleWontFail() throws Exception { Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, + null, client.getSettings()); long appendCount = 10; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index a6ad2df000..da73d60499 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -1452,4 +1452,20 @@ public void testAppendWithMissingValueMap() throws Exception { missingValueMap); } } + + @Test + public void testWrongCompressionType() throws Exception { + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> { + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setCompressorName("not-gzip") + .build(); + }); + assertTrue( + ex.getMessage() + .contains( + "Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported.")); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 42b7067d1d..1b059dde20 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -949,6 +949,20 @@ public void testMessageTooLarge() throws Exception { writer.close(); } + @Test + public void testWrongCompressionType() throws Exception { + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> { + StreamWriter.newBuilder(TEST_STREAM_1, client).setCompressorName("not-gzip").build(); + }); + assertTrue( + ex.getMessage() + .contains( + "Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported.")); + } + @Test public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception { ProtoSchema schema1 = createProtoSchema("foo");