Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support ByteString values on repeated fields #1996

Merged
merged 2 commits into from Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -57,6 +57,7 @@ If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.31.0'
```

If you are using SBT, add this to your dependencies:

Expand Down
Expand Up @@ -608,7 +608,9 @@ private static void fillRepeatedField(
}
}
if (!added) {
if (val instanceof byte[]) {
if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
} else if (val instanceof byte[]) {
protoMsg.addRepeatedField(fieldDescriptor, val);
} else if (val instanceof JSONArray) {
try {
Expand Down
Expand Up @@ -33,16 +33,20 @@
import com.google.cloud.bigquery.storage.test.SchemaTest;
import com.google.cloud.bigquery.storage.test.Test.FlexibleType;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.RepetitionType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
Expand All @@ -63,6 +67,7 @@
@RunWith(JUnit4.class)
public class JsonStreamWriterTest {
private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName());
private static int NUMERIC_SCALE = 9;
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
Expand Down Expand Up @@ -329,6 +334,137 @@ public void testSpecialTypeAppend() throws Exception {
}
}

@Test
public void testRepeatedByteStringAppend() throws Exception {
TableFieldSchema NON_REPEATED_A =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.REQUIRED)
.setName("a")
.build();

TableFieldSchema NON_REPEATED_B =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.BYTES)
.setMode(TableFieldSchema.Mode.REQUIRED)
.setName("b")
.build();

TableFieldSchema NON_REPEATED_C =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.BYTES)
.setMode(TableFieldSchema.Mode.REQUIRED)
.setName("c")
.build();

TableFieldSchema REPEATED_A =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("aa")
.build();

TableFieldSchema REPEATED_B =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.BYTES)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("bb")
.build();

TableFieldSchema REPEATED_C =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.BYTES)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("cc")
.build();

TableSchema tableSchema =
TableSchema.newBuilder()
.addFields(0, NON_REPEATED_A)
.addFields(1, NON_REPEATED_B)
.addFields(2, NON_REPEATED_C)
.addFields(3, REPEATED_A)
.addFields(4, REPEATED_B)
.addFields(5, REPEATED_C)
.build();

BigDecimal bigDecimal1 = new BigDecimal(1.1);
if (bigDecimal1.scale() > NUMERIC_SCALE) {
bigDecimal1 = bigDecimal1.setScale(NUMERIC_SCALE, RoundingMode.HALF_UP);
}
BigDecimal bigDecimal2 = new BigDecimal(2.2);
if (bigDecimal2.scale() > NUMERIC_SCALE) {
bigDecimal2 = bigDecimal2.setScale(NUMERIC_SCALE, RoundingMode.HALF_UP);
}
JSONArray aaValue = new JSONArray();
aaValue.put(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1));
aaValue.put(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal2));

byte[] byteArray1 = "bb1".getBytes("UTF-8");
byte[] byteArray2 = "bb2".getBytes("UTF-8");
JSONArray bbValue = new JSONArray();
bbValue.put(ByteString.copyFrom(byteArray1));
bbValue.put(ByteString.copyFrom(byteArray2));

ByteString byteString1 = ByteString.copyFrom("cc1", "UTF-8");
ByteString byteString2 = ByteString.copyFrom("cc2", "UTF-8");
JSONArray ccValue = new JSONArray();
ccValue.put(byteString1);
ccValue.put(byteString2);

JSONObject foo = new JSONObject();
foo.put("a", BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1));
foo.put("b", ByteString.copyFrom(byteArray1));
foo.put("c", byteString1);
foo.put("aa", aaValue);
foo.put("bb", bbValue);
foo.put("cc", ccValue);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

RepetitionType expectedProto =
RepetitionType.newBuilder()
.setA(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1))
.setB(ByteString.copyFrom(byteArray1))
.setC(byteString1)
.addAa(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal1))
.addAa(BigDecimalByteStringEncoder.encodeToNumericByteString(bigDecimal2))
.addBb(ByteString.copyFrom(byteArray1))
.addBb(ByteString.copyFrom(byteArray2))
.addCc(byteString1)
.addCc(byteString2)
.build();
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
expectedProto.toByteString());
}
}

@Test
public void testSingleAppendMultipleSimpleJson() throws Exception {
FooType expectedProto = FooType.newBuilder().setFoo("allen").build();
Expand Down
9 changes: 9 additions & 0 deletions google-cloud-bigquerystorage/src/test/proto/test.proto
Expand Up @@ -93,3 +93,12 @@ message FlexibleType {
optional string col_dGVzdC3liJc = 1
[(.google.cloud.bigquery.storage.v1.column_name) = "test-列"];
}

message RepetitionType {
required bytes a = 1;
required bytes b = 2;
required bytes c = 3;
repeated bytes aa = 4;
repeated bytes bb = 5;
repeated bytes cc = 6;
}