Skip to content

Commit

Permalink
fix: bug fix for streamWriter & jsonStreamWriter (#2122)
Browse files Browse the repository at this point in the history
* feat: add public api to stream writer to set the maximum wait time

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify back the readme change from owl post processor

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: Reduce the timeout to 5 minutes for the requests wait time in queue.

Since in write api server side we have total timeout of 2 minutes, it
does not make sense to wait 15 minutes to determine whether we have met
dead connection, let's reduce the timeout here

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix:  1.disable refresh of stream writer when the table schema is
explicitly provided 2. fix location string matching for multiplexing

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed May 25, 2023
1 parent a4e1131 commit 36964a3
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 19 deletions.
Expand Up @@ -349,15 +349,15 @@ public void run(Throwable finalStatus) {

/** Schedules the writing of rows at given offset. */
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
if (this.location != null && this.location != streamWriter.getLocation()) {
if (this.location != null && !this.location.equals(streamWriter.getLocation())) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription(
"StreamWriter with location "
+ streamWriter.getLocation()
+ " is scheduled to use a connection with location "
+ this.location));
} else if (this.location == null && streamWriter.getStreamName() != this.streamName) {
} else if (this.location == null && !streamWriter.getStreamName().equals(this.streamName)) {
// Location is null implies this is non-multiplexed connection.
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
Expand Down
Expand Up @@ -54,6 +54,10 @@ public class SchemaAwareStreamWriter<T> implements AutoCloseable {
private TableSchema tableSchema;
private ProtoSchema protoSchema;

// During some sitaution we want to skip stream writer refresh for updated schema. e.g. when
// the user provides the table schema, we should always use that schema.
private final boolean skipRefreshStreamWriter;

/**
* Constructs the SchemaAwareStreamWriter
*
Expand Down Expand Up @@ -87,6 +91,7 @@ private SchemaAwareStreamWriter(Builder<T> builder)
this.tableSchema = builder.tableSchema;
this.toProtoConverter = builder.toProtoConverter;
this.ignoreUnknownFields = builder.ignoreUnknownFields;
this.skipRefreshStreamWriter = builder.skipRefreshStreamWriter;
}

/**
Expand Down Expand Up @@ -125,6 +130,10 @@ private Message buildMessage(T item)
return this.toProtoConverter.convertToProtoMessage(
this.descriptor, this.tableSchema, item, ignoreUnknownFields);
} catch (Exceptions.DataHasUnknownFieldException ex) {
// Directly return error when stream writer refresh is disabled.
if (this.skipRefreshStreamWriter) {
throw ex;
}
LOG.warning(
"Saw unknown field "
+ ex.getFieldName()
Expand Down Expand Up @@ -157,7 +166,7 @@ public ApiFuture<AppendRowsResponse> append(Iterable<T> items, long offset)
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
// Create a new stream writer internally if a new updated schema is reported from backend.
if (this.streamWriter.getUpdatedSchema() != null) {
if (!this.skipRefreshStreamWriter && this.streamWriter.getUpdatedSchema() != null) {
refreshWriter(this.streamWriter.getUpdatedSchema());
}

Expand Down Expand Up @@ -404,6 +413,8 @@ public static final class Builder<T> {
private final BigQueryWriteClient client;
private final TableSchema tableSchema;

private final boolean skipRefreshStreamWriter;

private final ToProtoConverter<T> toProtoConverter;
private TransportChannelProvider channelProvider;
private CredentialsProvider credentialsProvider;
Expand Down Expand Up @@ -459,11 +470,12 @@ private Builder(
.build();

WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);

this.tableSchema = writeStream.getTableSchema();
this.location = writeStream.getLocation();
this.skipRefreshStreamWriter = false;
} else {
this.tableSchema = tableSchema;
this.skipRefreshStreamWriter = true;
}
this.toProtoConverter = toProtoConverter;
}
Expand Down
Expand Up @@ -160,6 +160,13 @@ private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build());
}

private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(String testStream) {
return JsonStreamWriter.newBuilder(testStream, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build());
}

@Test
public void testTwoParamNewBuilder_nullSchema() {
try {
Expand Down Expand Up @@ -658,8 +665,13 @@ public void run() throws Throwable {

@Test
public void testSimpleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM).build()) {
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down Expand Up @@ -722,7 +734,6 @@ public void testSimpleSchemaUpdate() throws Exception {
updatedFoo.put("bar", "bbb");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);

assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
Expand Down Expand Up @@ -751,6 +762,88 @@ public void testSimpleSchemaUpdate() throws Exception {
}
}

@Test
public void testSimpleSchemaUpdate_skipRefreshWriterIfSchemaProvided() throws Exception {
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(createAppendResponse(1));
testBigQueryWrite.addResponse(createAppendResponse(2));
testBigQueryWrite.addResponse(createAppendResponse(3));
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);

assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

// Second append with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("foo", "aaa");
updatedFoo.put("bar", "bbb");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

// Schema update will not happen for writer that has schema explicitly provided.
assertThrows(
AppendSerializationError.class,
() -> {
ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);
});
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
Expand All @@ -764,6 +857,10 @@ public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Except
.setType(TableFieldSchema.Type.STRING)
.setMode(Mode.NULLABLE))
.build();

// GetWriteStream is called once and got the updated schema
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build());
// GetWriteStream is called once and the writer is fixed to accept unknown fields.
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
Expand All @@ -772,8 +869,7 @@ public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Except
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
Expand All @@ -800,15 +896,16 @@ public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception
.setMode(Mode.NULLABLE))
.build();
// GetWriteStream is called once and got the updated schema
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
Expand All @@ -826,15 +923,28 @@ public void testSchemaUpdateInMultiplexing_singleConnection() throws Exception {
// Set min connection count to be 1 to force sharing connection.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
// GetWriteStream is called twice and got the updated schema
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA_2)
.setLocation("us")
.build());
// The following two writers have different stream name and schema, but will share the same
// connection .
JsonStreamWriter writer1 =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA)
getTestJsonStreamWriterBuilder(TEST_STREAM)
.setEnableConnectionPool(true)
.setLocation("us")
.build();
JsonStreamWriter writer2 =
getTestJsonStreamWriterBuilder(TEST_STREAM_2, TABLE_SCHEMA_2)
getTestJsonStreamWriterBuilder(TEST_STREAM_2)
.setEnableConnectionPool(true)
.setLocation("us")
.build();
Expand Down Expand Up @@ -911,14 +1021,27 @@ public void testSchemaUpdateInMultiplexing_multipleWriterForSameStreamName() thr
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());

// GetWriteStream is called twice and got the updated schema
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
// Create two writers writing to the same stream.
JsonStreamWriter writer1 =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA)
getTestJsonStreamWriterBuilder(TEST_STREAM)
.setEnableConnectionPool(true)
.setLocation("us")
.build();
JsonStreamWriter writer2 =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA)
getTestJsonStreamWriterBuilder(TEST_STREAM)
.setEnableConnectionPool(true)
.setLocation("us")
.build();
Expand Down Expand Up @@ -987,10 +1110,16 @@ public void testSchemaUpdateInMultiplexing_IgnoreUpdateIfTimeStampNewer() throws
// Set min connection count to be 1 to force sharing connection.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
// The following two writers have different stream name and schema, but will share the same
// connection .
// connection.
JsonStreamWriter writer1 =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA)
getTestJsonStreamWriterBuilder(TEST_STREAM)
.setEnableConnectionPool(true)
.setLocation("us")
.build();
Expand Down
Expand Up @@ -785,7 +785,7 @@ public void testJsonStreamWriterSchemaUpdate()
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) {
// write the 1st row
JSONObject foo = new JSONObject();
foo.put("col1", "aaa");
Expand Down Expand Up @@ -895,7 +895,7 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()

// Start writing using the JsonWriter
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) {
int numberOfThreads = 5;
ExecutorService streamTaskExecutor = Executors.newFixedThreadPool(5);
CountDownLatch latch = new CountDownLatch(numberOfThreads);
Expand Down

0 comments on commit 36964a3

Please sign in to comment.