diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java index e2cc1cc6b0..27cd9ef41a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java @@ -608,7 +608,9 @@ private static void fillRepeatedField( } } if (!added) { - if (val instanceof byte[]) { + if (val instanceof ByteString) { + protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray()); + } else if (val instanceof byte[]) { protoMsg.addRepeatedField(fieldDescriptor, val); } else if (val instanceof JSONArray) { try { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 286061a795..691ec4afde 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -33,16 +33,20 @@ import com.google.cloud.bigquery.storage.test.SchemaTest; import com.google.cloud.bigquery.storage.test.Test.FlexibleType; import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.test.Test.RepetitionType; import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode; +import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.Arrays; import java.util.Map; import java.util.UUID; @@ -63,6 +67,7 @@ @RunWith(JUnit4.class) public class JsonStreamWriterTest { private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName()); + private static int NUMERIC_SCALE = 9; private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default"; private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; @@ -329,6 +334,137 @@ public void testSpecialTypeAppend() throws Exception { } } + @Test + public void testRepeatedByteStringAppend() throws Exception { + TableFieldSchema NON_REPEATED_A = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.NUMERIC) + .setMode(TableFieldSchema.Mode.REQUIRED) + .setName("a") + .build(); + + TableFieldSchema NON_REPEATED_B = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.BYTES) + .setMode(TableFieldSchema.Mode.REQUIRED) + .setName("b") + .build(); + + TableFieldSchema NON_REPEATED_C = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.BYTES) + .setMode(TableFieldSchema.Mode.REQUIRED) + .setName("c") + .build(); + + TableFieldSchema REPEATED_A = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.NUMERIC) + .setMode(TableFieldSchema.Mode.REPEATED) + .setName("aa") + .build(); + + TableFieldSchema REPEATED_B = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.BYTES) + .setMode(TableFieldSchema.Mode.REPEATED) + .setName("bb") + .build(); + + TableFieldSchema REPEATED_C = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.BYTES) + .setMode(TableFieldSchema.Mode.REPEATED) + .setName("cc") + .build(); + + TableSchema tableSchema = + TableSchema.newBuilder() + .addFields(0, NON_REPEATED_A) + .addFields(1, NON_REPEATED_B) + .addFields(2, NON_REPEATED_C) + .addFields(3, REPEATED_A) + .addFields(4, REPEATED_B) + .addFields(5, REPEATED_C) + .build(); + + BigDecimal bigDecimal1 = new BigDecimal(1.1); + if (bigDecimal1.scale() > NUMERIC_SCALE) { + bigDecimal1 = bigDecimal1.setScale(NUMERIC_SCALE, RoundingMode.HALF_UP); + } + BigDecimal bigDecimal2 = new BigDecimal(2.2); + if (bigDecimal2.scale() > NUMERIC_SCALE) { + bigDecimal2 = bigDecimal2.setScale(NUMERIC_SCALE, RoundingMode.HALF_UP); + } + JSONArray aaValue = new JSONArray(); + aaValue.put(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1)); + aaValue.put(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal2)); + + byte[] byteArray1 = "bb1".getBytes("UTF-8"); + byte[] byteArray2 = "bb2".getBytes("UTF-8"); + JSONArray bbValue = new JSONArray(); + bbValue.put(ByteString.copyFrom(byteArray1)); + bbValue.put(ByteString.copyFrom(byteArray2)); + + ByteString byteString1 = ByteString.copyFrom("cc1", "UTF-8"); + ByteString byteString2 = ByteString.copyFrom("cc2", "UTF-8"); + JSONArray ccValue = new JSONArray(); + ccValue.put(byteString1); + ccValue.put(byteString2); + + JSONObject foo = new JSONObject(); + foo.put("a", BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1)); + foo.put("b", ByteString.copyFrom(byteArray1)); + foo.put("c", byteString1); + foo.put("aa", aaValue); + foo.put("bb", bbValue); + foo.put("cc", ccValue); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + RepetitionType expectedProto = + RepetitionType.newBuilder() + .setA(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1)) + .setB(ByteString.copyFrom(byteArray1)) + .setC(byteString1) + .addAa(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1)) + .addAa(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal2)) + .addBb(ByteString.copyFrom(byteArray1)) + .addBb(ByteString.copyFrom(byteArray2)) + .addCc(byteString1) + .addCc(byteString2) + .build(); + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) { + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + + ApiFuture appendFuture = writer.append(jsonArr); + assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue()); + appendFuture.get(); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(0), + expectedProto.toByteString()); + } + } + @Test public void testSingleAppendMultipleSimpleJson() throws Exception { FooType expectedProto = FooType.newBuilder().setFoo("allen").build(); diff --git a/google-cloud-bigquerystorage/src/test/proto/test.proto b/google-cloud-bigquerystorage/src/test/proto/test.proto index 7b1d74fe47..191f641f25 100644 --- a/google-cloud-bigquerystorage/src/test/proto/test.proto +++ b/google-cloud-bigquerystorage/src/test/proto/test.proto @@ -93,3 +93,12 @@ message FlexibleType { optional string col_dGVzdC3liJc = 1 [(.google.cloud.bigquery.storage.v1.column_name) = "test-列"]; } + +message RepetitionType { + required bytes a = 1; + required bytes b = 2; + required bytes c = 3; + repeated bytes aa = 4; + repeated bytes bb = 5; + repeated bytes cc = 6; +} \ No newline at end of file