diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 64abf82bb9..7e86da4d81 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -349,7 +349,7 @@ public void run(Throwable finalStatus) { /** Schedules the writing of rows at given offset. */ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { - if (this.location != null && this.location != streamWriter.getLocation()) { + if (this.location != null && !this.location.equals(streamWriter.getLocation())) { throw new StatusRuntimeException( Status.fromCode(Code.INVALID_ARGUMENT) .withDescription( @@ -357,7 +357,7 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, + streamWriter.getLocation() + " is scheduled to use a connection with location " + this.location)); - } else if (this.location == null && streamWriter.getStreamName() != this.streamName) { + } else if (this.location == null && !streamWriter.getStreamName().equals(this.streamName)) { // Location is null implies this is non-multiplexed connection. throw new StatusRuntimeException( Status.fromCode(Code.INVALID_ARGUMENT) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index 0e8f931914..10fceeee68 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -54,6 +54,10 @@ public class SchemaAwareStreamWriter implements AutoCloseable { private TableSchema tableSchema; private ProtoSchema protoSchema; + // During some sitaution we want to skip stream writer refresh for updated schema. e.g. when + // the user provides the table schema, we should always use that schema. + private final boolean skipRefreshStreamWriter; + /** * Constructs the SchemaAwareStreamWriter * @@ -87,6 +91,7 @@ private SchemaAwareStreamWriter(Builder builder) this.tableSchema = builder.tableSchema; this.toProtoConverter = builder.toProtoConverter; this.ignoreUnknownFields = builder.ignoreUnknownFields; + this.skipRefreshStreamWriter = builder.skipRefreshStreamWriter; } /** @@ -125,6 +130,10 @@ private Message buildMessage(T item) return this.toProtoConverter.convertToProtoMessage( this.descriptor, this.tableSchema, item, ignoreUnknownFields); } catch (Exceptions.DataHasUnknownFieldException ex) { + // Directly return error when stream writer refresh is disabled. + if (this.skipRefreshStreamWriter) { + throw ex; + } LOG.warning( "Saw unknown field " + ex.getFieldName() @@ -157,7 +166,7 @@ public ApiFuture append(Iterable items, long offset) // Handle schema updates in a Thread-safe way by locking down the operation synchronized (this) { // Create a new stream writer internally if a new updated schema is reported from backend. - if (this.streamWriter.getUpdatedSchema() != null) { + if (!this.skipRefreshStreamWriter && this.streamWriter.getUpdatedSchema() != null) { refreshWriter(this.streamWriter.getUpdatedSchema()); } @@ -404,6 +413,8 @@ public static final class Builder { private final BigQueryWriteClient client; private final TableSchema tableSchema; + private final boolean skipRefreshStreamWriter; + private final ToProtoConverter toProtoConverter; private TransportChannelProvider channelProvider; private CredentialsProvider credentialsProvider; @@ -459,11 +470,12 @@ private Builder( .build(); WriteStream writeStream = this.client.getWriteStream(writeStreamRequest); - this.tableSchema = writeStream.getTableSchema(); this.location = writeStream.getLocation(); + this.skipRefreshStreamWriter = false; } else { this.tableSchema = tableSchema; + this.skipRefreshStreamWriter = true; } this.toProtoConverter = toProtoConverter; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 1bd91925e4..eed96886a4 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -160,6 +160,13 @@ private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder( .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()); } + private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(String testStream) { + return JsonStreamWriter.newBuilder(testStream, client) + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()); + } + @Test public void testTwoParamNewBuilder_nullSchema() { try { @@ -658,8 +665,13 @@ public void run() throws Throwable { @Test public void testSimpleSchemaUpdate() throws Exception { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM).build()) { testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() .setAppendResult( @@ -722,7 +734,6 @@ public void testSimpleSchemaUpdate() throws Exception { updatedFoo.put("bar", "bbb"); JSONArray updatedJsonArr = new JSONArray(); updatedJsonArr.put(updatedFoo); - ApiFuture appendFuture4 = writer.append(updatedJsonArr); assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); @@ -751,6 +762,88 @@ public void testSimpleSchemaUpdate() throws Exception { } } + @Test + public void testSimpleSchemaUpdate_skipRefreshWriterIfSchemaProvided() throws Exception { + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(2)); + testBigQueryWrite.addResponse(createAppendResponse(3)); + // First append + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture appendFuture1 = writer.append(jsonArr); + ApiFuture appendFuture2 = writer.append(jsonArr); + ApiFuture appendFuture3 = writer.append(jsonArr); + + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("aaa").build().toByteString()); + + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("aaa").build().toByteString()); + + // Second append with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "aaa"); + updatedFoo.put("bar", "bbb"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + // Schema update will not happen for writer that has schema explicitly provided. + assertThrows( + AppendSerializationError.class, + () -> { + ApiFuture appendFuture4 = writer.append(updatedJsonArr); + }); + } + } + @Test public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); @@ -764,6 +857,10 @@ public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Except .setType(TableFieldSchema.Type.STRING) .setMode(Mode.NULLABLE)) .build(); + + // GetWriteStream is called once and got the updated schema + testBigQueryWrite.addResponse( + WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(tableSchema).build()); // GetWriteStream is called once and the writer is fixed to accept unknown fields. testBigQueryWrite.addResponse( WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build()); @@ -772,8 +869,7 @@ public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Except .setAppendResult( AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) .build()); - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) { + try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM).build()) { JSONObject foo = new JSONObject(); foo.put("test_int", 10); JSONObject bar = new JSONObject(); @@ -800,6 +896,8 @@ public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception .setMode(Mode.NULLABLE)) .build(); // GetWriteStream is called once and got the updated schema + testBigQueryWrite.addResponse( + WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(TABLE_SCHEMA).build()); testBigQueryWrite.addResponse( WriteStream.newBuilder().setName(TEST_STREAM).setTableSchema(updatedSchema).build()); testBigQueryWrite.addResponse( @@ -807,8 +905,7 @@ public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception .setAppendResult( AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) .build()); - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) { + try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM).build()) { JSONObject foo = new JSONObject(); foo.put("test_int", 10); JSONObject bar = new JSONObject(); @@ -826,15 +923,28 @@ public void testSchemaUpdateInMultiplexing_singleConnection() throws Exception { // Set min connection count to be 1 to force sharing connection. ConnectionWorkerPool.setOptions( Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + // GetWriteStream is called twice and got the updated schema + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA_2) + .setLocation("us") + .build()); // The following two writers have different stream name and schema, but will share the same // connection . JsonStreamWriter writer1 = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + getTestJsonStreamWriterBuilder(TEST_STREAM) .setEnableConnectionPool(true) .setLocation("us") .build(); JsonStreamWriter writer2 = - getTestJsonStreamWriterBuilder(TEST_STREAM_2, TABLE_SCHEMA_2) + getTestJsonStreamWriterBuilder(TEST_STREAM_2) .setEnableConnectionPool(true) .setLocation("us") .build(); @@ -911,14 +1021,27 @@ public void testSchemaUpdateInMultiplexing_multipleWriterForSameStreamName() thr ConnectionWorkerPool.setOptions( Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + // GetWriteStream is called twice and got the updated schema + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); // Create two writers writing to the same stream. JsonStreamWriter writer1 = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + getTestJsonStreamWriterBuilder(TEST_STREAM) .setEnableConnectionPool(true) .setLocation("us") .build(); JsonStreamWriter writer2 = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + getTestJsonStreamWriterBuilder(TEST_STREAM) .setEnableConnectionPool(true) .setLocation("us") .build(); @@ -987,10 +1110,16 @@ public void testSchemaUpdateInMultiplexing_IgnoreUpdateIfTimeStampNewer() throws // Set min connection count to be 1 to force sharing connection. ConnectionWorkerPool.setOptions( Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); // The following two writers have different stream name and schema, but will share the same - // connection . + // connection. JsonStreamWriter writer1 = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + getTestJsonStreamWriterBuilder(TEST_STREAM) .setEnableConnectionPool(true) .setLocation("us") .build(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index a068f6d635..1e73643eb8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -785,7 +785,7 @@ public void testJsonStreamWriterSchemaUpdate() WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { + JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) { // write the 1st row JSONObject foo = new JSONObject(); foo.put("col1", "aaa"); @@ -895,7 +895,7 @@ public void testJsonStreamWriterSchemaUpdateConcurrent() // Start writing using the JsonWriter try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) { + JsonStreamWriter.newBuilder(writeStream.getName(), client).build()) { int numberOfThreads = 5; ExecutorService streamTaskExecutor = Executors.newFixedThreadPool(5); CountDownLatch latch = new CountDownLatch(numberOfThreads);