From ad136b9fa25e774a33d02fc3a82a76fb1152b5c5 Mon Sep 17 00:00:00 2001 From: Artur Owczarek Date: Sat, 8 Apr 2023 08:45:17 +0200 Subject: [PATCH] feat: add schema aware stream writer (#2048) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add schema aware stream writer * [squash this commit] Fix clirr errors * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- README.md | 2 +- .../cloud/bigquery/storage/v1/Exceptions.java | 27 +- .../bigquery/storage/v1/JsonStreamWriter.java | 361 ++-------- .../storage/v1/JsonToProtoMessage.java | 68 +- .../storage/v1/SchemaAwareStreamWriter.java | 630 ++++++++++++++++++ .../bigquery/storage/v1/ToProtoConverter.java | 27 + .../storage/v1/JsonStreamWriterTest.java | 19 +- .../storage/v1/JsonToProtoMessageTest.java | 102 +-- .../it/ITBigQueryWriteManualClientTest.java | 4 +- .../v1beta2/JsonToProtoMessageTest.java | 2 +- 10 files changed, 834 insertions(+), 408 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java diff --git a/README.md b/README.md index 5eaba81994..fd890cd582 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.11.0') +implementation platform('com.google.cloud:libraries-bom:26.12.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index 8e13121414..2f9083e4e9 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -243,8 +243,8 @@ public String getStreamName() { } /** - * This exception is thrown from {@link JsonStreamWriter#append()} when the client side Json to - * Proto serializtion fails. It can also be thrown by the server in case rows contains invalid + * This exception is thrown from {@link SchemaAwareStreamWriter#append()} when the client side + * Proto serialization fails. It can also be thrown by the server in case rows contains invalid * data. The exception contains a Map of indexes of faulty rows and the corresponding error * message. */ @@ -362,16 +362,27 @@ protected InflightBytesLimitExceededException(String writerId, long currentLimit currentLimit); } } + /** - * Input Json data has unknown field to the schema of the JsonStreamWriter. User can either turn - * on IgnoreUnknownFields option on the JsonStreamWriter, or if they don't want the error to be - * ignored, they should recreate the JsonStreamWriter with the updated table schema. + * This class is replaced by a generic one. It will be removed soon. Please use {@link + * DataHasUnknownFieldException} */ - public static final class JsonDataHasUnknownFieldException extends IllegalArgumentException { + public static final class JsonDataHasUnknownFieldException extends DataHasUnknownFieldException { + protected JsonDataHasUnknownFieldException(String jsonFieldName) { + super(jsonFieldName); + } + } + /** + * Input data object has unknown field to the schema of the SchemaAwareStreamWriter. User can + * either turn on IgnoreUnknownFields option on the SchemaAwareStreamWriter, or if they don't want + * the error to be ignored, they should recreate the SchemaAwareStreamWriter with the updated + * table schema. + */ + public static class DataHasUnknownFieldException extends IllegalArgumentException { private final String jsonFieldName; - protected JsonDataHasUnknownFieldException(String jsonFieldName) { - super(String.format("JSONObject has fields unknown to BigQuery: %s.", jsonFieldName)); + public DataHasUnknownFieldException(String jsonFieldName) { + super(String.format("The source object has fields unknown to BigQuery: %s.", jsonFieldName)); this.jsonFieldName = jsonFieldName; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 415830d6ec..e68894bf9e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -20,22 +20,10 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.TransportChannelProvider; -import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; -import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Descriptors.DescriptorValidationException; -import com.google.protobuf.Message; -import com.google.rpc.Code; import java.io.IOException; -import java.util.HashMap; import java.util.Map; -import java.util.logging.Logger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nullable; import org.json.JSONArray; -import org.json.JSONObject; /** * A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is @@ -46,59 +34,17 @@ * order of minutes). */ public class JsonStreamWriter implements AutoCloseable { - private static String streamPatternString = - "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"; - private static Pattern streamPattern = Pattern.compile(streamPatternString); - private static final Logger LOG = Logger.getLogger(JsonStreamWriter.class.getName()); - private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L; - - private BigQueryWriteClient client; - private String streamName; - private StreamWriter streamWriter; - private StreamWriter.Builder streamWriterBuilder; - private Descriptor descriptor; - private TableSchema tableSchema; - private boolean ignoreUnknownFields = false; - private boolean reconnectAfter10M = false; - private long totalMessageSize = 0; - private long absTotal = 0; - private ProtoSchema protoSchema; - private boolean enableConnectionPool = false; + private final SchemaAwareStreamWriter schemaAwareStreamWriter; /** * Constructs the JsonStreamWriter * * @param builder The Builder object for the JsonStreamWriter */ - private JsonStreamWriter(Builder builder) + private JsonStreamWriter(SchemaAwareStreamWriter.Builder builder) throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException, InterruptedException { - this.descriptor = - BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema); - - if (builder.client == null) { - streamWriterBuilder = StreamWriter.newBuilder(builder.streamName); - } else { - streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client); - } - this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); - this.totalMessageSize = protoSchema.getSerializedSize(); - this.client = builder.client; - streamWriterBuilder.setWriterSchema(protoSchema); - setStreamWriterSettings( - builder.channelProvider, - builder.credentialsProvider, - builder.executorProvider, - builder.endpoint, - builder.flowControlSettings, - builder.traceId); - streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); - streamWriterBuilder.setLocation(builder.location); - this.streamWriter = streamWriterBuilder.build(); - this.streamName = builder.streamName; - this.tableSchema = builder.tableSchema; - this.ignoreUnknownFields = builder.ignoreUnknownFields; - this.reconnectAfter10M = builder.reconnectAfter10M; + this.schemaAwareStreamWriter = builder.build(); } /** @@ -112,64 +58,10 @@ private JsonStreamWriter(Builder builder) * ApiFuture */ public ApiFuture append(JSONArray jsonArr) - throws IOException, DescriptorValidationException { - return append(jsonArr, -1); - } - - private void refreshWriter(TableSchema updatedSchema) - throws DescriptorValidationException, IOException { - Preconditions.checkNotNull(updatedSchema, "updatedSchema is null."); - LOG.info("Refresh internal writer due to schema update, stream: " + this.streamName); - // Close the StreamWriterf - this.streamWriter.close(); - // Update JsonStreamWriter's TableSchema and Descriptor - this.tableSchema = updatedSchema; - this.descriptor = - BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema); - this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); - this.totalMessageSize = protoSchema.getSerializedSize(); - // Create a new underlying StreamWriter with the updated TableSchema and Descriptor - this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build(); + throws IOException, Descriptors.DescriptorValidationException { + return this.schemaAwareStreamWriter.append(jsonArr); } - private Message buildMessage(JSONObject json) - throws InterruptedException, DescriptorValidationException, IOException { - try { - return JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - } catch (Exceptions.JsonDataHasUnknownFieldException ex) { - // Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before - // trying to get the table schema to increase the chance of succeed. This is to avoid - // client's invalid datfa caused storm of GetWriteStream. - LOG.warning( - "Saw Json unknown field " - + ex.getFieldName() - + ", try to refresh the writer with updated schema, stream: " - + streamName); - GetWriteStreamRequest writeStreamRequest = - GetWriteStreamRequest.newBuilder() - .setName(this.streamName) - .setView(WriteStreamView.FULL) - .build(); - WriteStream writeStream = client.getWriteStream(writeStreamRequest); - refreshWriter(writeStream.getTableSchema()); - try { - return JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - } catch (Exceptions.JsonDataHasUnknownFieldException exex) { - LOG.warning( - "First attempt failed, waiting for 30 seconds to retry, stream: " + this.streamName); - Thread.sleep(UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS); - writeStream = client.getWriteStream(writeStreamRequest); - // TODO(yiru): We should let TableSchema return a timestamp so that we can simply - // compare the timestamp to see if the table schema is the same. If it is the - // same, we don't need to go refresh the writer again. - refreshWriter(writeStream.getTableSchema()); - return JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - } - } - } /** * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON * data to protobuf messages, then using StreamWriter's append() to write the data at the @@ -182,69 +74,17 @@ private Message buildMessage(JSONObject json) * ApiFuture */ public ApiFuture append(JSONArray jsonArr, long offset) - throws IOException, DescriptorValidationException { - // Handle schema updates in a Thread-safe way by locking down the operation - synchronized (this) { - // Create a new stream writer internally if a new updated schema is reported from backend. - if (this.streamWriter.getUpdatedSchema() != null) { - refreshWriter(this.streamWriter.getUpdatedSchema()); - } - - ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); - // Any error in convertJsonToProtoMessage will throw an - // IllegalArgumentException/IllegalStateException/NullPointerException. - // IllegalArgumentException will be collected into a Map of row indexes to error messages. - // After the conversion is finished an AppendSerializationError exception that contains all - // the - // conversion errors will be thrown. - long currentRequestSize = 0; - Map rowIndexToErrorMessage = new HashMap<>(); - for (int i = 0; i < jsonArr.length(); i++) { - JSONObject json = jsonArr.getJSONObject(i); - try { - Message protoMessage = buildMessage(json); - rowsBuilder.addSerializedRows(protoMessage.toByteString()); - currentRequestSize += protoMessage.getSerializedSize(); - } catch (IllegalArgumentException exception) { - if (exception instanceof Exceptions.FieldParseError) { - Exceptions.FieldParseError ex = (Exceptions.FieldParseError) exception; - rowIndexToErrorMessage.put( - i, - "Field " - + ex.getFieldName() - + " failed to convert to " - + ex.getBqType() - + ". Error: " - + ex.getCause().getMessage()); - } else { - rowIndexToErrorMessage.put(i, exception.getMessage()); - } - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - - if (!rowIndexToErrorMessage.isEmpty()) { - throw new AppendSerializationError( - Code.INVALID_ARGUMENT.getNumber(), - "Append serialization failed for writer: " + streamName, - streamName, - rowIndexToErrorMessage); - } - final ApiFuture appendResponseFuture = - this.streamWriter.append(rowsBuilder.build(), offset); - return appendResponseFuture; - } + throws IOException, Descriptors.DescriptorValidationException { + return this.schemaAwareStreamWriter.append(jsonArr, offset); } - /** @return The name of the write stream associated with this writer. */ public String getStreamName() { - return this.streamName; + return this.schemaAwareStreamWriter.getStreamName(); } /** @return A unique Id for this writer. */ public String getWriterId() { - return streamWriter.getWriterId(); + return this.schemaAwareStreamWriter.getWriterId(); } /** @@ -252,8 +92,8 @@ public String getWriterId() { * * @return Descriptor */ - public Descriptor getDescriptor() { - return this.descriptor; + public Descriptors.Descriptor getDescriptor() { + return this.schemaAwareStreamWriter.getDescriptor(); } /** @@ -262,7 +102,7 @@ public Descriptor getDescriptor() { * @return Descriptor */ public String getLocation() { - return this.streamWriter.getLocation(); + return this.schemaAwareStreamWriter.getLocation(); } /** @@ -273,7 +113,7 @@ public String getLocation() { * the throughput in exclusive stream case, or create a new Writer in the default stream case. */ public long getInflightWaitSeconds() { - return streamWriter.getInflightWaitSeconds(); + return this.schemaAwareStreamWriter.getInflightWaitSeconds(); } /** @@ -285,54 +125,13 @@ public long getInflightWaitSeconds() { */ public void setMissingValueInterpretationMap( Map missingValueInterpretationMap) { - streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); + this.schemaAwareStreamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); } /** @return the missing value interpretation map used for the writer. */ public Map getMissingValueInterpretationMap() { - return streamWriter.getMissingValueInterpretationMap(); - } - - /** Sets all StreamWriter settings. */ - private void setStreamWriterSettings( - @Nullable TransportChannelProvider channelProvider, - @Nullable CredentialsProvider credentialsProvider, - @Nullable ExecutorProvider executorProvider, - @Nullable String endpoint, - @Nullable FlowControlSettings flowControlSettings, - @Nullable String traceId) { - if (channelProvider != null) { - streamWriterBuilder.setChannelProvider(channelProvider); - } - if (credentialsProvider != null) { - streamWriterBuilder.setCredentialsProvider(credentialsProvider); - } - if (executorProvider != null) { - streamWriterBuilder.setExecutorProvider(executorProvider); - } - if (endpoint != null) { - streamWriterBuilder.setEndpoint(endpoint); - } - if (traceId != null) { - streamWriterBuilder.setTraceId("JsonWriter_" + traceId); - } else { - streamWriterBuilder.setTraceId("JsonWriter:null"); - } - if (flowControlSettings != null) { - if (flowControlSettings.getMaxOutstandingRequestBytes() != null) { - streamWriterBuilder.setMaxInflightBytes( - flowControlSettings.getMaxOutstandingRequestBytes()); - } - if (flowControlSettings.getMaxOutstandingElementCount() != null) { - streamWriterBuilder.setMaxInflightRequests( - flowControlSettings.getMaxOutstandingElementCount()); - } - if (flowControlSettings.getLimitExceededBehavior() != null) { - streamWriterBuilder.setLimitExceededBehavior( - flowControlSettings.getLimitExceededBehavior()); - } - } + return this.schemaAwareStreamWriter.getMissingValueInterpretationMap(); } /** @@ -352,9 +151,9 @@ private void setStreamWriterSettings( * @return Builder */ public static Builder newBuilder(String streamOrTableName, TableSchema tableSchema) { - Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); - Preconditions.checkNotNull(tableSchema, "TableSchema is null."); - return new Builder(streamOrTableName, tableSchema, null); + return new Builder( + SchemaAwareStreamWriter.newBuilder( + streamOrTableName, tableSchema, JsonToProtoMessage.INSTANCE)); } /** @@ -374,10 +173,9 @@ public static Builder newBuilder(String streamOrTableName, TableSchema tableSche */ public static Builder newBuilder( String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) { - Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); - Preconditions.checkNotNull(tableSchema, "TableSchema is null."); - Preconditions.checkNotNull(client, "BigQuery client is null."); - return new Builder(streamOrTableName, tableSchema, client); + return new Builder( + SchemaAwareStreamWriter.newBuilder( + streamOrTableName, tableSchema, client, JsonToProtoMessage.INSTANCE)); } /** @@ -390,15 +188,13 @@ public static Builder newBuilder( * @return Builder */ public static Builder newBuilder(String streamOrTableName, BigQueryWriteClient client) { - Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); - Preconditions.checkNotNull(client, "BigQuery client is null."); - return new Builder(streamOrTableName, null, client); + return new Builder( + SchemaAwareStreamWriter.newBuilder(streamOrTableName, client, JsonToProtoMessage.INSTANCE)); } - /** Closes the underlying StreamWriter. */ @Override public void close() { - this.streamWriter.close(); + this.schemaAwareStreamWriter.close(); } /** @@ -407,76 +203,20 @@ public void close() { * connection pool is not used. Client should recreate JsonStreamWriter in this case. */ public boolean isClosed() { - return this.streamWriter.isClosed(); + return this.schemaAwareStreamWriter.isClosed(); } /** @return if user explicitly closed the writer. */ public boolean isUserClosed() { - return this.streamWriter.isUserClosed(); + return this.schemaAwareStreamWriter.isUserClosed(); } public static final class Builder { - private String streamName; - private BigQueryWriteClient client; - private TableSchema tableSchema; - - private TransportChannelProvider channelProvider; - private CredentialsProvider credentialsProvider; - private ExecutorProvider executorProvider; - private FlowControlSettings flowControlSettings; - private String endpoint; - private boolean createDefaultStream = false; - private String traceId; - private boolean ignoreUnknownFields = false; - private boolean reconnectAfter10M = false; - // Indicte whether multiplexing mode is enabled. - private boolean enableConnectionPool = false; - private String location; - - private static String streamPatternString = - "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; - private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)"; - - private static Pattern streamPattern = Pattern.compile(streamPatternString); - private static Pattern tablePattern = Pattern.compile(tablePatternString); + private final SchemaAwareStreamWriter.Builder schemaAwareStreamWriterBuilder; - /** - * Constructor for JsonStreamWriter's Builder - * - * @param streamOrTableName name of the stream that must follow - * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or - * "projects/[^/]+/datasets/[^/]+/tables/[^/]+" - * @param tableSchema schema used to convert Json to proto messages. - * @param client - */ - private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) { - Matcher streamMatcher = streamPattern.matcher(streamOrTableName); - if (!streamMatcher.matches()) { - Matcher tableMatcher = tablePattern.matcher(streamOrTableName); - if (!tableMatcher.matches()) { - throw new IllegalArgumentException("Invalid name: " + streamOrTableName); - } else { - this.streamName = streamOrTableName + "/_default"; - } - } else { - this.streamName = streamOrTableName; - } - this.client = client; - if (tableSchema == null) { - GetWriteStreamRequest writeStreamRequest = - GetWriteStreamRequest.newBuilder() - .setName(this.getStreamName()) - .setView(WriteStreamView.FULL) - .build(); - - WriteStream writeStream = this.client.getWriteStream(writeStreamRequest); - TableSchema writeStreamTableSchema = writeStream.getTableSchema(); - - this.tableSchema = writeStreamTableSchema; - this.location = writeStream.getLocation(); - } else { - this.tableSchema = tableSchema; - } + private Builder(SchemaAwareStreamWriter.Builder schemaAwareStreamWriterBuilder) { + this.schemaAwareStreamWriterBuilder = + schemaAwareStreamWriterBuilder.setTraceIdBase("JsonWriter"); } /** @@ -486,8 +226,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite * @return Builder */ public Builder setChannelProvider(TransportChannelProvider channelProvider) { - this.channelProvider = - Preconditions.checkNotNull(channelProvider, "ChannelProvider is null."); + this.schemaAwareStreamWriterBuilder.setChannelProvider(channelProvider); return this; } @@ -498,8 +237,7 @@ public Builder setChannelProvider(TransportChannelProvider channelProvider) { * @return Builder */ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { - this.credentialsProvider = - Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null."); + this.schemaAwareStreamWriterBuilder.setCredentialsProvider(credentialsProvider); return this; } @@ -510,8 +248,7 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { * @return */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { - this.executorProvider = - Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); + this.schemaAwareStreamWriterBuilder.setExecutorProvider(executorProvider); return this; } @@ -522,8 +259,7 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { * @return Builder */ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { - this.flowControlSettings = - Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null."); + this.schemaAwareStreamWriterBuilder.setFlowControlSettings(flowControlSettings); return this; } @@ -533,7 +269,7 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { * @return Builder */ public String getStreamName() { - return streamName; + return this.schemaAwareStreamWriterBuilder.getStreamName(); } /** @@ -543,7 +279,7 @@ public String getStreamName() { * @return Builder */ public Builder setEndpoint(String endpoint) { - this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null."); + this.schemaAwareStreamWriterBuilder.setEndpoint(endpoint); return this; } @@ -554,7 +290,7 @@ public Builder setEndpoint(String endpoint) { * @return Builder */ public Builder setTraceId(String traceId) { - this.traceId = Preconditions.checkNotNull(traceId, "TraceId is null."); + this.schemaAwareStreamWriterBuilder.setTraceId(traceId); return this; } @@ -566,21 +302,12 @@ public Builder setTraceId(String traceId) { * @return Builder */ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { - this.ignoreUnknownFields = ignoreUnknownFields; + this.schemaAwareStreamWriterBuilder.setIgnoreUnknownFields(ignoreUnknownFields); return this; } - /** - * @Deprecated Setter for a reconnectAfter10M, temporaily workaround for omg/48020. Fix for the - * omg is supposed to roll out by 2/11/2022 Friday. If you set this to True, your write will be - * slower (0.75MB/s per connection), but your writes will not be stuck as a sympton of - * omg/48020. - * - * @param reconnectAfter10M - * @return Builder - */ + /** This parameter is not used. It will be removed soon. */ public Builder setReconnectAfter10M(boolean reconnectAfter10M) { - this.reconnectAfter10M = false; return this; } @@ -593,7 +320,7 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) { * @return Builder */ public Builder setEnableConnectionPool(boolean enableConnectionPool) { - this.enableConnectionPool = enableConnectionPool; + this.schemaAwareStreamWriterBuilder.setEnableConnectionPool(true); return this; } @@ -605,11 +332,7 @@ public Builder setEnableConnectionPool(boolean enableConnectionPool) { * @return Builder */ public Builder setLocation(String location) { - if (this.location != null && !this.location.equals(location)) { - throw new IllegalArgumentException( - "Specified location " + location + " does not match the system value " + this.location); - } - this.location = location; + this.schemaAwareStreamWriterBuilder.setLocation(location); return this; } @@ -621,7 +344,7 @@ public Builder setLocation(String location) { public JsonStreamWriter build() throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException, InterruptedException { - return new JsonStreamWriter(this); + return new JsonStreamWriter(this.schemaAwareStreamWriterBuilder); } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java index c402d66f54..156e1c1948 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java @@ -45,10 +45,11 @@ import org.threeten.bp.temporal.TemporalAccessor; /** - * Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf - * descriptor must have all fields lowercased. + * Converts JSON data to Protobuf messages given the Protobuf descriptor and BigQuery table schema. + * The Protobuf descriptor must have all fields lowercased. */ -public class JsonToProtoMessage { +public class JsonToProtoMessage implements ToProtoConverter { + public static final JsonToProtoMessage INSTANCE = new JsonToProtoMessage(); private static final Logger LOG = Logger.getLogger(JsonToProtoMessage.class.getName()); private static int NUMERIC_SCALE = 9; private static ImmutableMap FieldTypeToDebugMessage = @@ -102,6 +103,42 @@ public class JsonToProtoMessage { .toFormatter() .withZone(ZoneOffset.UTC); + /** You can use {@link JsonToProtoMessage.INSTANCE} instead */ + public JsonToProtoMessage() {} + + public static DynamicMessage convertJsonToProtoMessage( + Descriptor protoSchema, + TableSchema tableSchema, + JSONObject json, + boolean ignoreUnknownFields) { + return INSTANCE.convertToProtoMessage(protoSchema, tableSchema, json, ignoreUnknownFields); + } + + public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, JSONObject json) { + return INSTANCE.convertToProtoMessage(protoSchema, json); + } + + public static DynamicMessage convertJsonToProtoMessage( + Descriptor protoSchema, TableSchema tableSchema, JSONObject json) { + return INSTANCE.convertToProtoMessage(protoSchema, tableSchema, json); + } + + /** + * Converts input message to Protobuf + * + * @param protoSchema the schema of the output Protobuf schems. + * @param tableSchema tha underlying table schema for which Protobuf is being built. + * @param json the input JSON object converted to Protobuf. + * @param ignoreUnknownFields flag indicating that the additional fields not present in the output + * schema should be accepted. + * @return Converted message in Protobuf format. + */ + @Override + public DynamicMessage convertToProtoMessage( + Descriptor protoSchema, TableSchema tableSchema, Object json, boolean ignoreUnknownFields) { + return convertToProtoMessage(protoSchema, tableSchema, (JSONObject) json, ignoreUnknownFields); + } + /** * Converts Json data to protocol buffer messages given the protocol buffer descriptor. * @@ -109,14 +146,13 @@ public class JsonToProtoMessage { * @param json * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, JSONObject json) + public DynamicMessage convertToProtoMessage(Descriptor protoSchema, JSONObject json) throws IllegalArgumentException { Preconditions.checkNotNull(json, "JSONObject is null."); Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null."); Preconditions.checkState(json.length() != 0, "JSONObject is empty."); - return convertJsonToProtoMessageImpl( - protoSchema, null, json, "root", /*topLevel=*/ true, false); + return convertToProtoMessage(protoSchema, null, json, "root", /*topLevel=*/ true, false); } /** @@ -128,7 +164,7 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J * @param json * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - public static DynamicMessage convertJsonToProtoMessage( + public DynamicMessage convertToProtoMessage( Descriptor protoSchema, TableSchema tableSchema, JSONObject json) throws IllegalArgumentException { Preconditions.checkNotNull(json, "JSONObject is null."); @@ -136,7 +172,7 @@ public static DynamicMessage convertJsonToProtoMessage( Preconditions.checkNotNull(tableSchema, "TableSchema is null."); Preconditions.checkState(json.length() != 0, "JSONObject is empty."); - return convertJsonToProtoMessageImpl( + return convertToProtoMessage( protoSchema, tableSchema.getFieldsList(), json, @@ -155,7 +191,7 @@ public static DynamicMessage convertJsonToProtoMessage( * @param ignoreUnknownFields allows unknown fields in JSON input to be ignored. * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - public static DynamicMessage convertJsonToProtoMessage( + public DynamicMessage convertToProtoMessage( Descriptor protoSchema, TableSchema tableSchema, JSONObject json, boolean ignoreUnknownFields) throws IllegalArgumentException { Preconditions.checkNotNull(json, "JSONObject is null."); @@ -163,7 +199,7 @@ public static DynamicMessage convertJsonToProtoMessage( Preconditions.checkNotNull(tableSchema, "TableSchema is null."); Preconditions.checkState(json.length() != 0, "JSONObject is empty."); - return convertJsonToProtoMessageImpl( + return convertToProtoMessage( protoSchema, tableSchema.getFieldsList(), json, @@ -181,7 +217,7 @@ public static DynamicMessage convertJsonToProtoMessage( * @param topLevel checks if root level has any matching fields. * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - private static DynamicMessage convertJsonToProtoMessageImpl( + private DynamicMessage convertToProtoMessage( Descriptor protoSchema, List tableSchema, JSONObject json, @@ -209,7 +245,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl( String currentScope = jsonScope + "." + jsonName; FieldDescriptor field = protoSchema.findFieldByName(jsonFieldLocator); if (field == null && !ignoreUnknownFields) { - throw new Exceptions.JsonDataHasUnknownFieldException(currentScope); + throw new Exceptions.DataHasUnknownFieldException(currentScope); } else if (field == null) { continue; } @@ -274,7 +310,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl( * @param currentScope Debugging purposes * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - private static void fillField( + private void fillField( DynamicMessage.Builder protoMsg, FieldDescriptor fieldDescriptor, TableFieldSchema fieldSchema, @@ -482,7 +518,7 @@ private static void fillField( Message.Builder message = protoMsg.newBuilderForField(fieldDescriptor); protoMsg.setField( fieldDescriptor, - convertJsonToProtoMessageImpl( + convertToProtoMessage( fieldDescriptor.getMessageType(), fieldSchema == null ? null : fieldSchema.getFieldsList(), json.getJSONObject(exactJsonKeyName), @@ -510,7 +546,7 @@ private static void fillField( * @param currentScope Debugging purposes * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. */ - private static void fillRepeatedField( + private void fillRepeatedField( DynamicMessage.Builder protoMsg, FieldDescriptor fieldDescriptor, TableFieldSchema fieldSchema, @@ -747,7 +783,7 @@ private static void fillRepeatedField( Message.Builder message = protoMsg.newBuilderForField(fieldDescriptor); protoMsg.addRepeatedField( fieldDescriptor, - convertJsonToProtoMessageImpl( + convertToProtoMessage( fieldDescriptor.getMessageType(), fieldSchema == null ? null : fieldSchema.getFieldsList(), jsonArray.getJSONObject(i), diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java new file mode 100644 index 0000000000..2e8c5ea2c5 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -0,0 +1,630 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; +import com.google.common.base.Preconditions; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Message; +import com.google.rpc.Code; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; + +/** + * A StreamWriter that can write data to BigQuery tables. The SchemaAwareStreamWriter is built on + * top of a StreamWriter, and it converts all data to Protobuf messages using provided converter + * then calls StreamWriter's append() method to write to BigQuery tables. It maintains all + * StreamWriter functions, but also provides an additional feature: schema update support, where if + * the BigQuery table schema is updated, users will be able to ingest data on the new schema after + * some time (in order of minutes). + */ +public class SchemaAwareStreamWriter implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(SchemaAwareStreamWriter.class.getName()); + private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L; + private final BigQueryWriteClient client; + private final String streamName; + private final StreamWriter.Builder streamWriterBuilder; + private final boolean ignoreUnknownFields; + private final ToProtoConverter toProtoConverter; + private StreamWriter streamWriter; + private Descriptor descriptor; + private TableSchema tableSchema; + private ProtoSchema protoSchema; + + /** + * Constructs the SchemaAwareStreamWriter + * + * @param builder The Builder object for the SchemaAwareStreamWriter + */ + private SchemaAwareStreamWriter(Builder builder) + throws DescriptorValidationException, IllegalArgumentException, IOException { + this.descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema); + + if (builder.client == null) { + streamWriterBuilder = StreamWriter.newBuilder(builder.streamName); + } else { + streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client); + } + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + this.client = builder.client; + streamWriterBuilder.setWriterSchema(protoSchema); + setStreamWriterSettings( + builder.channelProvider, + builder.credentialsProvider, + builder.executorProvider, + builder.endpoint, + builder.flowControlSettings, + builder.traceIdBase, + builder.traceId); + streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); + streamWriterBuilder.setLocation(builder.location); + this.streamWriter = streamWriterBuilder.build(); + this.streamName = builder.streamName; + this.tableSchema = builder.tableSchema; + this.toProtoConverter = builder.toProtoConverter; + this.ignoreUnknownFields = builder.ignoreUnknownFields; + } + + /** + * Writes a collection that contains objects to the BigQuery table by first converting the data to + * Protobuf messages, then using StreamWriter's append() to write the data at current end of + * stream. If there is a schema update, the current StreamWriter is closed. A new StreamWriter is + * created with the updated TableSchema. + * + * @param items The array that contains objects to be written + * @return ApiFuture returns an AppendRowsResponse message wrapped in an + * ApiFuture + */ + public ApiFuture append(Iterable items) + throws IOException, DescriptorValidationException { + return append(items, -1); + } + + private void refreshWriter(TableSchema updatedSchema) + throws DescriptorValidationException, IOException { + Preconditions.checkNotNull(updatedSchema, "updatedSchema is null."); + LOG.info("Refresh internal writer due to schema update, stream: " + this.streamName); + // Close the StreamWriterf + this.streamWriter.close(); + // Update SchemaAwareStreamWriter's TableSchema and Descriptor + this.tableSchema = updatedSchema; + this.descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema); + this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); + // Create a new underlying StreamWriter with the updated TableSchema and Descriptor + this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build(); + } + + private Message buildMessage(T item) + throws InterruptedException, DescriptorValidationException, IOException { + try { + return this.toProtoConverter.convertToProtoMessage( + this.descriptor, this.tableSchema, item, ignoreUnknownFields); + } catch (Exceptions.DataHasUnknownFieldException ex) { + // Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before + // trying to get the table schema to increase the chance of succeed. This is to avoid + // client's invalid datfa caused storm of GetWriteStream. + LOG.warning( + "Saw unknown field " + + ex.getFieldName() + + ", try to refresh the writer with updated schema, stream: " + + streamName); + GetWriteStreamRequest writeStreamRequest = + GetWriteStreamRequest.newBuilder() + .setName(this.streamName) + .setView(WriteStreamView.FULL) + .build(); + WriteStream writeStream = client.getWriteStream(writeStreamRequest); + refreshWriter(writeStream.getTableSchema()); + try { + return this.toProtoConverter.convertToProtoMessage( + this.descriptor, this.tableSchema, item, ignoreUnknownFields); + } catch (Exceptions.DataHasUnknownFieldException exex) { + LOG.warning( + "First attempt failed, waiting for 30 seconds to retry, stream: " + this.streamName); + Thread.sleep(UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS); + writeStream = client.getWriteStream(writeStreamRequest); + // TODO(yiru): We should let TableSchema return a timestamp so that we can simply + // compare the timestamp to see if the table schema is the same. If it is the + // same, we don't need to go refresh the writer again. + refreshWriter(writeStream.getTableSchema()); + return this.toProtoConverter.convertToProtoMessage( + this.descriptor, this.tableSchema, item, ignoreUnknownFields); + } + } + } + /** + * Writes a collection that contains objects to the BigQuery table by first converting the data to + * Protobuf messages, then using StreamWriter's append() to write the data at the specified + * offset. If there is a schema update, the current StreamWriter is closed. A new StreamWriter is + * created with the updated TableSchema. + * + * @param items The collection that contains objects to be written + * @param offset Offset for deduplication + * @return ApiFuture returns an AppendRowsResponse message wrapped in an + * ApiFuture + */ + public ApiFuture append(Iterable items, long offset) + throws IOException, DescriptorValidationException { + // Handle schema updates in a Thread-safe way by locking down the operation + synchronized (this) { + // Create a new stream writer internally if a new updated schema is reported from backend. + if (this.streamWriter.getUpdatedSchema() != null) { + refreshWriter(this.streamWriter.getUpdatedSchema()); + } + + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + // Any error in convertToProtoMessage will throw an + // IllegalArgumentException/IllegalStateException/NullPointerException. + // IllegalArgumentException will be collected into a Map of row indexes to error messages. + // After the conversion is finished an AppendSerializtionError exception that contains all the + // conversion errors will be thrown. + Map rowIndexToErrorMessage = new HashMap<>(); + int i = -1; + for (T item : items) { + i += 1; + try { + Message protoMessage = buildMessage(item); + rowsBuilder.addSerializedRows(protoMessage.toByteString()); + } catch (IllegalArgumentException exception) { + if (exception instanceof Exceptions.FieldParseError) { + Exceptions.FieldParseError ex = (Exceptions.FieldParseError) exception; + rowIndexToErrorMessage.put( + i, + "Field " + + ex.getFieldName() + + " failed to convert to " + + ex.getBqType() + + ". Error: " + + ex.getCause().getMessage()); + } else { + rowIndexToErrorMessage.put(i, exception.getMessage()); + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + if (!rowIndexToErrorMessage.isEmpty()) { + throw new AppendSerializationError( + Code.INVALID_ARGUMENT.getNumber(), + "Append serialization failed for writer: " + streamName, + streamName, + rowIndexToErrorMessage); + } + return this.streamWriter.append(rowsBuilder.build(), offset); + } + } + + /** @return The name of the write stream associated with this writer. */ + public String getStreamName() { + return this.streamName; + } + + /** @return A unique Id for this writer. */ + public String getWriterId() { + return streamWriter.getWriterId(); + } + + /** + * Gets current descriptor + * + * @return Descriptor + */ + public Descriptor getDescriptor() { + return this.descriptor; + } + + /** + * Gets the location of the destination + * + * @return Descriptor + */ + public String getLocation() { + return this.streamWriter.getLocation(); + } + + /** + * Returns the wait of a request in Client side before sending to the Server. Request could wait + * in Client because it reached the client side inflight request limit (adjustable when + * constructing the Writer). The value is the wait time for the last sent request. A constant high + * wait value indicates a need for more throughput, you can create a new Stream for to increase + * the throughput in exclusive stream case, or create a new Writer in the default stream case. + */ + public long getInflightWaitSeconds() { + return streamWriter.getInflightWaitSeconds(); + } + + /** + * Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input + * missingValueInterpretationMap is used for all append requests unless otherwise changed. + * + * @param missingValueInterpretationMap the missing value interpretation map used by the + * SchemaAwareStreamWriter. + */ + public void setMissingValueInterpretationMap( + Map missingValueInterpretationMap) { + streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap); + } + + /** @return the missing value interpretation map used for the writer. */ + public Map + getMissingValueInterpretationMap() { + return streamWriter.getMissingValueInterpretationMap(); + } + + /** Sets all StreamWriter settings. */ + private void setStreamWriterSettings( + @Nullable TransportChannelProvider channelProvider, + @Nullable CredentialsProvider credentialsProvider, + @Nullable ExecutorProvider executorProvider, + @Nullable String endpoint, + @Nullable FlowControlSettings flowControlSettings, + @Nullable String traceIdBase, + @Nullable String traceId) { + if (channelProvider != null) { + streamWriterBuilder.setChannelProvider(channelProvider); + } + if (credentialsProvider != null) { + streamWriterBuilder.setCredentialsProvider(credentialsProvider); + } + if (executorProvider != null) { + streamWriterBuilder.setExecutorProvider(executorProvider); + } + if (endpoint != null) { + streamWriterBuilder.setEndpoint(endpoint); + } + if (traceIdBase != null) { + if (traceId != null) { + streamWriterBuilder.setTraceId(traceIdBase + "_" + traceId); + } else { + streamWriterBuilder.setTraceId(traceIdBase + ":null"); + } + } else { + if (traceId != null) { + streamWriterBuilder.setTraceId("SchemaAwareStreamWriter_" + traceId); + } else { + streamWriterBuilder.setTraceId("SchemaAwareStreamWriter:null"); + } + } + if (flowControlSettings != null) { + if (flowControlSettings.getMaxOutstandingRequestBytes() != null) { + streamWriterBuilder.setMaxInflightBytes( + flowControlSettings.getMaxOutstandingRequestBytes()); + } + if (flowControlSettings.getMaxOutstandingElementCount() != null) { + streamWriterBuilder.setMaxInflightRequests( + flowControlSettings.getMaxOutstandingElementCount()); + } + if (flowControlSettings.getLimitExceededBehavior() != null) { + streamWriterBuilder.setLimitExceededBehavior( + flowControlSettings.getLimitExceededBehavior()); + } + } + } + + /** + * newBuilder that constructs a SchemaAwareStreamWriter builder with BigQuery client being + * initialized by StreamWriter by default. + * + *

The table schema passed in will be updated automatically when there is a schema update + * event. When used for Writer creation, it should be the latest schema. So when you are trying to + * reuse a stream, you should use Builder newBuilder( String streamOrTableName, + * BigQueryWriteClient client) instead, so the created Writer will be based on a fresh schema. + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or table name + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+" + * @param tableSchema The schema of the table when the stream was created, which is passed back + * through {@code WriteStream} + * @return Builder + */ + public static Builder newBuilder( + String streamOrTableName, TableSchema tableSchema, ToProtoConverter toProtoConverter) { + Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); + Preconditions.checkNotNull(tableSchema, "TableSchema is null."); + Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null."); + return new Builder<>(streamOrTableName, tableSchema, null, toProtoConverter); + } + + /** + * newBuilder that constructs a SchemaAwareStreamWriter builder. + * + *

The table schema passed in will be updated automatically when there is a schema update + * event. When used for Writer creation, it should be the latest schema. So when you are trying to + * reuse a stream, you should use Builder newBuilder( String streamOrTableName, + * BigQueryWriteClient client) instead, so the created Writer will be based on a fresh schema. + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" + * @param tableSchema The schema of the table when the stream was created, which is passed back + * through {@code WriteStream} + * @param client + * @return Builder + */ + public static Builder newBuilder( + String streamOrTableName, + TableSchema tableSchema, + BigQueryWriteClient client, + ToProtoConverter toProtoConverter) { + Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); + Preconditions.checkNotNull(tableSchema, "TableSchema is null."); + Preconditions.checkNotNull(client, "BigQuery client is null."); + Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null."); + return new Builder<>(streamOrTableName, tableSchema, client, toProtoConverter); + } + + /** + * newBuilder that constructs a SchemaAwareStreamWriter builder with TableSchema being initialized + * by StreamWriter by default. + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" + * @param client BigQueryWriteClient + * @return Builder + */ + public static Builder newBuilder( + String streamOrTableName, BigQueryWriteClient client, ToProtoConverter toProtoConverter) { + Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null."); + Preconditions.checkNotNull(client, "BigQuery client is null."); + Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null."); + return new Builder<>(streamOrTableName, null, client, toProtoConverter); + } + + /** Closes the underlying StreamWriter. */ + @Override + public void close() { + this.streamWriter.close(); + } + + /** + * @return if a writer can no longer be used for writing. It is due to either the + * SchemaAwareStreamWriter is explicitly closed or the underlying connection is broken when + * connection pool is not used. Client should recreate SchemaAwareStreamWriter in this case. + */ + public boolean isClosed() { + return this.streamWriter.isClosed(); + } + + /** @return if user explicitly closed the writer. */ + public boolean isUserClosed() { + return this.streamWriter.isUserClosed(); + } + + public static final class Builder { + private final String streamName; + private final BigQueryWriteClient client; + private final TableSchema tableSchema; + + private final ToProtoConverter toProtoConverter; + private TransportChannelProvider channelProvider; + private CredentialsProvider credentialsProvider; + private ExecutorProvider executorProvider; + private FlowControlSettings flowControlSettings; + private String endpoint; + private String traceIdBase; + private String traceId; + private boolean ignoreUnknownFields = false; + // Indicates whether multiplexing mode is enabled. + private boolean enableConnectionPool = false; + private String location; + + private static final String streamPatternString = + "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; + private static final String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)"; + + private static final Pattern streamPattern = Pattern.compile(streamPatternString); + private static final Pattern tablePattern = Pattern.compile(tablePatternString); + + /** + * Constructor for SchemaAwareStreamWriter's Builder + * + * @param streamOrTableName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+" + * @param tableSchema schema used to convert items to proto messages. + * @param client + * @param toProtoConverter converter used to convert items to proto messages + */ + private Builder( + String streamOrTableName, + TableSchema tableSchema, + BigQueryWriteClient client, + ToProtoConverter toProtoConverter) { + Matcher streamMatcher = streamPattern.matcher(streamOrTableName); + if (!streamMatcher.matches()) { + Matcher tableMatcher = tablePattern.matcher(streamOrTableName); + if (!tableMatcher.matches()) { + throw new IllegalArgumentException("Invalid name: " + streamOrTableName); + } else { + this.streamName = streamOrTableName + "/_default"; + } + } else { + this.streamName = streamOrTableName; + } + this.client = client; + if (tableSchema == null) { + GetWriteStreamRequest writeStreamRequest = + GetWriteStreamRequest.newBuilder() + .setName(this.getStreamName()) + .setView(WriteStreamView.FULL) + .build(); + + WriteStream writeStream = this.client.getWriteStream(writeStreamRequest); + + this.tableSchema = writeStream.getTableSchema(); + this.location = writeStream.getLocation(); + } else { + this.tableSchema = tableSchema; + } + this.toProtoConverter = toProtoConverter; + } + + /** + * Setter for the underlying StreamWriter's TransportChannelProvider. + * + * @param channelProvider + * @return Builder + */ + public Builder setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = + Preconditions.checkNotNull(channelProvider, "ChannelProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's CredentialsProvider. + * + * @param credentialsProvider + * @return Builder + */ + public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = + Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's ExecutorProvider. + * + * @param executorProvider + * @return + */ + public Builder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = + Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's FlowControlSettings. + * + * @param flowControlSettings + * @return Builder + */ + public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { + this.flowControlSettings = + Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null."); + return this; + } + + /** + * Stream name on the builder. + * + * @return Builder + */ + public String getStreamName() { + return streamName; + } + + /** + * Setter for the underlying StreamWriter's Endpoint. + * + * @param endpoint + * @return Builder + */ + public Builder setEndpoint(String endpoint) { + this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null."); + return this; + } + + /** + * Setter for a traceId to help identify traffic origin. + * + * @param traceId + * @return Builder + */ + public Builder setTraceId(String traceId) { + this.traceId = Preconditions.checkNotNull(traceId, "TraceId is null."); + return this; + } + + /** + * Setter for a traceIdBase to help identify traffic origin. + * + * @param traceIdBase + * @return Builder + */ + public Builder setTraceIdBase(String traceIdBase) { + this.traceIdBase = Preconditions.checkNotNull(traceIdBase, "TraceIdBase is null."); + return this; + } + + /** + * Setter for a ignoreUnknownFields, if true, unknown fields to BigQuery will be ignored instead + * of error out. + * + * @param ignoreUnknownFields + * @return Builder + */ + public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { + this.ignoreUnknownFields = ignoreUnknownFields; + return this; + } + + /** + * Enable multiplexing for this writer. In multiplexing mode tables will share the same + * connection if possible until the connection is overwhelmed. This feature is still under + * development, please contact write api team before using. + * + * @param enableConnectionPool + * @return Builder + */ + public Builder setEnableConnectionPool(boolean enableConnectionPool) { + this.enableConnectionPool = enableConnectionPool; + return this; + } + + /** + * Location of the table this stream writer is targeting. Connection pools are shared by + * location. + * + * @param location + * @return Builder + */ + public Builder setLocation(String location) { + if (this.location != null && !this.location.equals(location)) { + throw new IllegalArgumentException( + "Specified location " + location + " does not match the system value " + this.location); + } + this.location = location; + return this; + } + + /** + * Builds SchemaAwareStreamWriter + * + * @return SchemaAwareStreamWriter + */ + public SchemaAwareStreamWriter build() + throws DescriptorValidationException, IllegalArgumentException, IOException, + InterruptedException { + return new SchemaAwareStreamWriter<>(this); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java new file mode 100644 index 0000000000..ca17ed11e7 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java @@ -0,0 +1,27 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; + +public interface ToProtoConverter { + DynamicMessage convertToProtoMessage( + Descriptors.Descriptor protoSchema, + TableSchema tableSchema, + T inputObject, + boolean ignoreUnknownFields); +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 50532b1e0a..5b01c5a0a2 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -23,11 +23,9 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; -import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.LocalChannelProvider; -import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.cloud.bigquery.storage.test.JsonTest; import com.google.cloud.bigquery.storage.test.SchemaTest; @@ -52,7 +50,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.logging.Logger; import org.json.JSONArray; import org.json.JSONObject; import org.junit.After; @@ -67,13 +64,10 @@ @RunWith(JUnit4.class) public class JsonStreamWriterTest { - private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName()); - private static int NUMERIC_SCALE = 9; + private static final int NUMERIC_SCALE = 9; private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default"; private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; - private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = - InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); private static LocalChannelProvider channelProvider; private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; @@ -136,8 +130,7 @@ public JsonStreamWriterTest() throws DescriptorValidationException {} public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); serviceHelper = - new MockServiceHelper( - UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + new MockServiceHelper(UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); serviceHelper.start(); channelProvider = serviceHelper.createChannelProvider(); fakeExecutor = new FakeScheduledExecutorService(); @@ -638,7 +631,7 @@ public void testCreateDefaultStream_withNoClientPassedIn() throws Exception { } @Test - public void testCreateDefaultStreamWrongLocation() throws Exception { + public void testCreateDefaultStreamWrongLocation() { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build(); testBigQueryWrite.addResponse( @@ -1098,7 +1091,7 @@ public void testWithoutIgnoreUnknownFieldsUpdateFail() throws Exception { Assert.fail("expected ExecutionException"); } catch (AppendSerializationError ex) { assertEquals( - "JSONObject has fields unknown to BigQuery: root.test_unknown.", + "The source object has fields unknown to BigQuery: root.test_unknown.", ex.getRowIndexToErrorMessage().get(1)); assertEquals(TEST_STREAM, ex.getStreamName()); } @@ -1219,7 +1212,7 @@ public void testMultipleAppendSerializationErrors() appendSerializationError.getRowIndexToErrorMessage(); assertEquals(2, rowIndexToErrorMessage.size()); assertEquals( - "JSONObject has fields unknown to BigQuery: root.not_foo.", + "The source object has fields unknown to BigQuery: root.not_foo.", rowIndexToErrorMessage.get(0)); assertEquals( "Field root.foo failed to convert to STRING. Error: JSONObject does not have a string field at root.foo.", @@ -1310,7 +1303,7 @@ public void testAppendWithMissingValueMap() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) { - Map missingValueMap = new HashMap(); + Map missingValueMap = new HashMap<>(); missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE); missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE); writer.setMissingValueInterpretationMap(missingValueMap); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java index 91785ce0ec..5c44d014d4 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java @@ -541,7 +541,7 @@ public void testDifferentNameCasing() throws Exception { json.put("inT", 1); json.put("lONg", 1L); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -554,7 +554,7 @@ public void testBool() throws Exception { json.put("uppercase", "TRUE"); json.put("lowercase", "false"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBool.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBool.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -569,7 +569,7 @@ public void testInt64() throws Exception { json.put("long", 1L); json.put("string", "1"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -583,7 +583,7 @@ public void testInt32() throws Exception { json.put("int", 1); json.put("string", 1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt32.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt32.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -595,7 +595,7 @@ public void testInt32NotMatchInt64() throws Exception { json.put("int", 1L); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt32.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt32.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals("JSONObject does not have a int32 field at root.int.", e.getMessage()); @@ -615,7 +615,7 @@ public void testDateTimeMismatch() throws Exception { json.put("datetime", 1.0); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestDatetime.getDescriptor(), tableSchema, json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { @@ -636,7 +636,8 @@ public void testTimeMismatch() throws Exception { json.put("time", new JSONArray(new Double[] {1.0})); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestTime.getDescriptor(), tableSchema, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage( + TestTime.getDescriptor(), tableSchema, json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals("JSONObject does not have a int64 field at root.time[0].", e.getMessage()); @@ -657,7 +658,7 @@ public void testMixedCaseFieldNames() throws Exception { json.put("fooBar", "hello"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestMixedCaseFieldNames.getDescriptor(), tableSchema, json); } @@ -682,7 +683,7 @@ public void testDouble() throws Exception { json.put("long", 8L); json.put("string", "9.1"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestDouble.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestDouble.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -705,7 +706,7 @@ public void testDoubleHighPrecision() throws Exception { JSONObject json = new JSONObject(); json.put("numeric", 3.400500512978076); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestNumeric.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -735,7 +736,7 @@ public void testDoubleHighPrecision_RepeatedField() throws Exception { JSONObject json = new JSONObject(); json.put("bignumeric", ImmutableList.of(3.400500512978076, 0.10000000000055, 0.12)); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestBignumeric.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -775,7 +776,7 @@ public void testTimestamp() throws Exception { json.put("test_timezone", "2022-04-05 09:06:11 PST"); json.put("test_saformat", "2018/08/19 12:11"); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestTimestamp.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -792,7 +793,8 @@ public void testDate() throws Exception { json.put("test_string", "2021-11-04"); json.put("test_long", 18935L); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestDate.getDescriptor(), tableSchema, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage( + TestDate.getDescriptor(), tableSchema, json); assertEquals(expectedProto, protoMsg); } @@ -804,7 +806,7 @@ public void testAllTypes() throws Exception { try { LOG.info("Testing " + json + " over " + entry.getKey().getFullName()); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(entry.getKey(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(entry.getKey(), json); LOG.info("Convert Success!"); assertEquals(protoMsg, AllTypesToCorrectProto.get(entry.getKey())[success]); success += 1; @@ -833,7 +835,7 @@ public void testAllRepeatedTypesWithLimits() throws Exception { try { LOG.info("Testing " + json + " over " + entry.getKey().getFullName()); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(entry.getKey(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(entry.getKey(), json); LOG.info("Convert Success!"); assertEquals( protoMsg.toString(), @@ -869,7 +871,7 @@ public void testOptional() throws Exception { json.put("byte", 1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -881,7 +883,8 @@ public void testRepeatedIsOptional() throws Exception { json.put("required_double", 1.1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestRepeatedIsOptional.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage( + TestRepeatedIsOptional.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -891,7 +894,7 @@ public void testRequired() throws Exception { json.put("optional_double", 1.1); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestRequired.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestRequired.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -911,7 +914,7 @@ public void testStructSimple() throws Exception { json.put("test_field_type", stringType); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(MessageType.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(MessageType.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -923,7 +926,7 @@ public void testStructSimpleFail() throws Exception { json.put("test_field_type", stringType); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(MessageType.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(MessageType.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1057,7 +1060,7 @@ public void testStructComplex() throws Exception { json.put("test_interval", "0-0 0 0:0:0.000005"); json.put("test_json", new JSONArray(new String[] {"{'a':'b'}"})); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage( + JsonToProtoMessage.INSTANCE.convertToProtoMessage( ComplexRoot.getDescriptor(), COMPLEX_TABLE_SCHEMA, json); assertEquals(expectedProto, protoMsg); } @@ -1083,7 +1086,7 @@ public void testStructComplexFail() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexRoot.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexRoot.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1097,7 +1100,7 @@ public void testRepeatedWithMixedTypes() throws Exception { json.put("test_repeated", new JSONArray("[1.1, 2.2, true]")); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedDouble.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedDouble.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1140,7 +1143,7 @@ public void testNestedRepeatedComplex() throws Exception { json.put("repeated_string", jsonRepeatedString); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(NestedRepeated.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(NestedRepeated.getDescriptor(), json); assertEquals(protoMsg, expectedProto); } @@ -1159,7 +1162,7 @@ public void testNestedRepeatedComplexFail() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(NestedRepeated.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(NestedRepeated.getDescriptor(), json); Assert.fail("should fail"); } catch (IllegalArgumentException e) { assertEquals( @@ -1181,7 +1184,7 @@ public void testEmptySecondLevelObject() throws Exception { json.put("complex_lvl2", complexLvl2); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1193,10 +1196,11 @@ public void testAllowUnknownFieldsError() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt64.getDescriptor(), json); Assert.fail("Should fail"); - } catch (Exceptions.JsonDataHasUnknownFieldException e) { - assertEquals("JSONObject has fields unknown to BigQuery: root.string.", e.getMessage()); + } catch (Exceptions.DataHasUnknownFieldException e) { + assertEquals( + "The source object has fields unknown to BigQuery: root.string.", e.getMessage()); assertEquals("root.string", e.getFieldName()); } } @@ -1207,7 +1211,7 @@ public void testEmptyProtoMessage() throws Exception { json.put("test_repeated", new JSONArray(new int[0])); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt64.getDescriptor(), json); assertEquals(protoMsg.getAllFields().size(), 0); } @@ -1216,7 +1220,7 @@ public void testEmptyJSONObject() throws Exception { JSONObject json = new JSONObject(); try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(Int64Type.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(Int64Type.getDescriptor(), json); Assert.fail("Should fail"); } catch (IllegalStateException e) { assertEquals("JSONObject is empty.", e.getMessage()); @@ -1227,7 +1231,7 @@ public void testEmptyJSONObject() throws Exception { public void testNullJson() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(Int64Type.getDescriptor(), null); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(Int64Type.getDescriptor(), null); Assert.fail("Should fail"); } catch (NullPointerException e) { assertEquals("JSONObject is null.", e.getMessage()); @@ -1238,7 +1242,7 @@ public void testNullJson() throws Exception { public void testNullDescriptor() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(null, new JSONObject()); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(null, new JSONObject()); Assert.fail("Should fail"); } catch (NullPointerException e) { assertEquals("Protobuf descriptor is null.", e.getMessage()); @@ -1255,11 +1259,12 @@ public void testAllowUnknownFieldsSecondLevel() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json); Assert.fail("Should fail"); } catch (IllegalArgumentException e) { assertEquals( - "JSONObject has fields unknown to BigQuery: root.complex_lvl2.no_match.", e.getMessage()); + "The source object has fields unknown to BigQuery: root.complex_lvl2.no_match.", + e.getMessage()); } } @@ -1276,7 +1281,7 @@ public void testTopLevelMatchSecondLevelMismatch() throws Exception { json.put("complex_lvl2", complex_lvl2); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1287,7 +1292,7 @@ public void testJsonNullValue() throws Exception { json.put("long", JSONObject.NULL); json.put("int", 1); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1298,7 +1303,7 @@ public void testJsonAllFieldsNullValue() throws Exception { json.put("long", JSONObject.NULL); json.put("int", JSONObject.NULL); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json); assertEquals(expectedProto, protoMsg); } @@ -1319,7 +1324,8 @@ public void testBadJsonFieldRepeated() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedBytes.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage( + RepeatedBytes.getDescriptor(), ts, json); Assert.fail("Should fail"); } catch (Exceptions.FieldParseError ex) { assertEquals(ex.getBqType(), "NUMERIC"); @@ -1344,7 +1350,8 @@ public void testBadJsonFieldIntRepeated() throws Exception { try { DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage( + RepeatedInt32.getDescriptor(), ts, json); Assert.fail("Should fail"); } catch (IllegalArgumentException ex) { assertEquals(ex.getMessage(), "Text 'blah' could not be parsed at index 0"); @@ -1375,7 +1382,7 @@ public void testNullRepeatedField() throws Exception { json.put("test_repeated", JSONObject.NULL); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); assertTrue(protoMsg.getAllFields().isEmpty()); // Missing repeated field. @@ -1383,7 +1390,7 @@ public void testNullRepeatedField() throws Exception { json.put("test_non_repeated", JSONObject.NULL); protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt32.getDescriptor(), ts, json); assertTrue(protoMsg.getAllFields().isEmpty()); } @@ -1406,10 +1413,11 @@ public void testDoubleAndFloatToNumericConversion() { JSONObject json = new JSONObject(); json.put("numeric", new Double(24.678)); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestNumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestNumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); json.put("numeric", new Float(24.678)); - protoMsg = JsonToProtoMessage.convertJsonToProtoMessage(TestNumeric.getDescriptor(), ts, json); + protoMsg = + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestNumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); } @@ -1434,7 +1442,7 @@ public void testBigDecimalToBigNumericConversion() { JSONObject json = new JSONObject(); json.put("bignumeric", Collections.singletonList(new BigDecimal("24.6789012345"))); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); } @@ -1458,11 +1466,11 @@ public void testDoubleAndFloatToRepeatedBigNumericConversion() { JSONObject json = new JSONObject(); json.put("bignumeric", Collections.singletonList(new Double(24.678))); DynamicMessage protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); json.put("bignumeric", Collections.singletonList(new Float(24.678))); protoMsg = - JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json); + JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json); assertEquals(expectedProto, protoMsg); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index c80bb960ac..a068f6d635 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -742,9 +742,7 @@ public void testJsonStreamWriterWithMessagesOver10M() new ArrayList>(totalRequest); // Sends a total of 30MB over the wire. try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .setReconnectAfter10M(true) - .build()) { + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { for (int k = 0; k < totalRequest; k++) { JSONObject row = new JSONObject(); row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java index c340d22e9a..9827e72588 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java @@ -618,7 +618,7 @@ public void testMixedCasedFieldNames() throws Exception { json.put("fooBar", "hello"); DynamicMessage protoMsg = - com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.convertJsonToProtoMessage( + com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.INSTANCE.convertToProtoMessage( TestMixedCaseFieldNames.getDescriptor(), tableSchema, json); }