Skip to content

Commit

Permalink
feat: client unknown fields drives writer refreshment (#1797)
Browse files Browse the repository at this point in the history
* feat:client unknown fields drives writer refreshment

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed Sep 26, 2022
1 parent b3ffd77 commit d8aaed5
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 22 deletions.
Expand Up @@ -49,6 +49,7 @@ public class JsonStreamWriter implements AutoCloseable {
"projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
private static Pattern streamPattern = Pattern.compile(streamPatternString);
private static final Logger LOG = Logger.getLogger(JsonStreamWriter.class.getName());
private static final long UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS = 30100L;

private BigQueryWriteClient client;
private String streamName;
Expand Down Expand Up @@ -77,6 +78,7 @@ private JsonStreamWriter(Builder builder)
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
this.client = builder.client;
streamWriterBuilder.setWriterSchema(protoSchema);
setStreamWriterSettings(
builder.channelProvider,
Expand Down Expand Up @@ -108,6 +110,60 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
return append(jsonArr, -1);
}

private void refreshWriter(TableSchema updatedSchema)
throws DescriptorValidationException, IOException {
Preconditions.checkNotNull(updatedSchema, "updatedSchema is null.");
LOG.info("Refresh internal writer due to schema update, stream: " + this.streamName);
// Close the StreamWriterf
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
}

private Message buildMessage(JSONObject json)
throws InterruptedException, DescriptorValidationException, IOException {
try {
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
} catch (Exceptions.JsonDataHasUnknownFieldException ex) {
// Backend cache for GetWriteStream schema staleness can be 30 seconds, wait a bit before
// trying to get the table schema to increase the chance of succeed. This is to avoid
// client's invalid datfa caused storm of GetWriteStream.
LOG.warning(
"Saw Json unknown field "
+ ex.getFieldName()
+ ", try to refresh the writer with updated schema, stream: "
+ streamName);
GetWriteStreamRequest writeStreamRequest =
GetWriteStreamRequest.newBuilder()
.setName(this.streamName)
.setView(WriteStreamView.FULL)
.build();
WriteStream writeStream = client.getWriteStream(writeStreamRequest);
refreshWriter(writeStream.getTableSchema());
try {
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
} catch (Exceptions.JsonDataHasUnknownFieldException exex) {
LOG.warning(
"First attempt failed, waiting for 30 seconds to retry, stream: " + this.streamName);
Thread.sleep(UPDATE_SCHEMA_RETRY_INTERVAL_MILLIS);
writeStream = client.getWriteStream(writeStreamRequest);
// TODO(yiru): We should let TableSchema return a timestamp so that we can simply
// compare the timestamp to see if the table schema is the same. If it is the
// same, we don't need to go refresh the writer again.
refreshWriter(writeStream.getTableSchema());
return JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
}
}
}
/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at the
Expand All @@ -126,17 +182,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
// Update schema only work when connection pool is not enabled.
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
&& this.streamWriter.getUpdatedSchema() != null) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
refreshWriter(this.streamWriter.getUpdatedSchema());
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
Expand All @@ -150,9 +196,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
try {
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
Message protoMessage = buildMessage(json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
} catch (IllegalArgumentException exception) {
Expand All @@ -169,6 +213,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
} else {
rowIndexToErrorMessage.put(i, exception.getMessage());
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

Expand Down Expand Up @@ -277,7 +323,7 @@ public static Builder newBuilder(String streamOrTableName, TableSchema tableSche
*/
public static Builder newBuilder(
String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamOrTableName, "StreamName is null.");
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkNotNull(client, "BigQuery client is null.");
return new Builder(streamOrTableName, tableSchema, client);
Expand Down Expand Up @@ -359,6 +405,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite

WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
TableSchema writeStreamTableSchema = writeStream.getTableSchema();

this.tableSchema = writeStreamTableSchema;
} else {
this.tableSchema = tableSchema;
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class JsonStreamWriterTest {
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
private static MockServiceHelper serviceHelper;
private BigQueryWriteClient client;

private final TableFieldSchema FOO =
TableFieldSchema.newBuilder()
Expand Down Expand Up @@ -116,14 +118,15 @@ public void setUp() throws Exception {
channelProvider = serviceHelper.createChannelProvider();
fakeExecutor = new FakeScheduledExecutorService();
testBigQueryWrite.setExecutor(fakeExecutor);
BigQueryWriteSettings settings =
BigQueryWriteSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
client = BigQueryWriteClient.create(settings);
Instant time = Instant.now();
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
// Add enough GetWriteStream response.
for (int i = 0; i < 4; i++) {
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build());
}
}

@After
Expand All @@ -133,7 +136,7 @@ public void tearDown() throws Exception {

private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
String testStream, TableSchema BQTableSchema) {
return JsonStreamWriter.newBuilder(testStream, BQTableSchema)
return JsonStreamWriter.newBuilder(testStream, BQTableSchema, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create());
}
Expand Down Expand Up @@ -507,8 +510,85 @@ public void testSimpleSchemaUpdate() throws Exception {
}

@Test
public void testWithoutIgnoreUnknownFields() throws Exception {
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
TableSchema updatedSchema =
TableSchema.newBuilder()
.addFields(0, TEST_INT)
.addFields(
1,
TableFieldSchema.newBuilder()
.setName("test_string")
.setType(TableFieldSchema.Type.STRING)
.setMode(Mode.NULLABLE))
.build();
// GetWriteStream is called once and the writer is fixed to accept unknown fields.
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_string", "a");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
TableSchema updatedSchema =
TableSchema.newBuilder()
.addFields(0, TEST_INT)
.addFields(
1,
TableFieldSchema.newBuilder()
.setName("test_string")
.setType(TableFieldSchema.Type.STRING)
.setMode(Mode.NULLABLE))
.build();
// GetWriteStream is called twice and got the updated schema
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_string", "a");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateFail() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
// GetWriteStream is called once but failed to update to the right schema.
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
Expand Down Expand Up @@ -626,6 +706,10 @@ public void testMultipleAppendSerializtionErrors()
jsonArr.put(foo);
jsonArr.put(foo1);
jsonArr.put(foo2);
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build());

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
Expand Down

0 comments on commit d8aaed5

Please sign in to comment.