diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f9f9b1b0b9295..46b542a84a580 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -685,16 +686,22 @@ String retrieveErrorDetails(Iterable failedContext) { void postFlush() { // If we got a response indicating an updated schema, recreate the client. - if (this.appendClientInfo != null) { + if (this.appendClientInfo != null && autoUpdateSchema) { @Nullable StreamAppendClient streamAppendClient = appendClientInfo.getStreamAppendClient(); @Nullable - TableSchema updatedTableSchema = + TableSchema updatedTableSchemaReturned = (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; - if (updatedTableSchema != null && autoUpdateSchema) { - invalidateWriteStream(); - appendClientInfo = - Preconditions.checkStateNotNull(getAppendClientInfo(false, updatedTableSchema)); + if (updatedTableSchemaReturned != null) { + Optional updatedTableSchema = + TableSchemaUpdateUtils.getUpdatedSchema( + this.initialTableSchema, updatedTableSchemaReturned); + if (updatedTableSchema.isPresent()) { + invalidateWriteStream(); + appendClientInfo = + Preconditions.checkStateNotNull( + getAppendClientInfo(false, updatedTableSchema.get())); + } } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index cd23b7be2c52e..e0353bf9a9082 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -751,16 +752,23 @@ public void process( if (autoUpdateSchema) { @Nullable StreamAppendClient streamAppendClient = appendClientInfo.get().getStreamAppendClient(); + TableSchema originalSchema = appendClientInfo.get().getTableSchema(); + ; @Nullable - TableSchema newSchema = + TableSchema updatedSchemaReturned = (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; // Update the table schema and clear the append client. - if (newSchema != null) { - appendClientInfo.set( - AppendClientInfo.of(newSchema, appendClientInfo.get().getCloseAppendClient())); - APPEND_CLIENTS.invalidate(element.getKey()); - APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get()); - updatedSchema.write(newSchema); + if (updatedSchemaReturned != null) { + Optional newSchema = + TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned); + if (newSchema.isPresent()) { + appendClientInfo.set( + AppendClientInfo.of( + newSchema.get(), appendClientInfo.get().getCloseAppendClient())); + APPEND_CLIENTS.invalidate(element.getKey()); + APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get()); + updatedSchema.write(newSchema.get()); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java new file mode 100644 index 0000000000000..cba394abc8904 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; + +/** Helper utilities for handling schema-update responses. */ +public class TableSchemaUpdateUtils { + /* + Given an original schema and an updated schema, return a schema that should be used to process future records. + This function returns: + - If the new schema is not compatible (e.g. missing fields), then it will return Optional.empty(). + - If the new schema is equivalent (i.e. equal modulo field ordering) to the old schema, then it will return + Optional.empty(). + - The returned schema will always contain the old schema as a prefix. This ensures that if any of the old + fields are reordered in the new schema, we maintain the old order. + */ + public static Optional getUpdatedSchema( + TableSchema oldSchema, TableSchema newSchema) { + Result updatedFields = getUpdatedSchema(oldSchema.getFieldsList(), newSchema.getFieldsList()); + if (updatedFields.isEquivalent()) { + return Optional.empty(); + } else { + return updatedFields + .getFields() + .map( + tableFieldSchemas -> + TableSchema.newBuilder().addAllFields(tableFieldSchemas).build()); + } + } + + @AutoValue + abstract static class Result { + abstract Optional> getFields(); + + abstract boolean isEquivalent(); + + static Result of(List fields, boolean isEquivalent) { + return new AutoValue_TableSchemaUpdateUtils_Result(Optional.of(fields), isEquivalent); + } + + static Result empty() { + return new AutoValue_TableSchemaUpdateUtils_Result(Optional.empty(), false); + } + } + + private static Result getUpdatedSchema( + @Nullable List oldSchema, @Nullable List newSchema) { + if (newSchema == null) { + return Result.empty(); + } + if (oldSchema == null) { + return Result.of(newSchema, false); + } + + Map newSchemaMap = + newSchema.stream().collect(Collectors.toMap(TableFieldSchema::getName, x -> x)); + Set fieldNamesPopulated = Sets.newHashSet(); + List updatedSchema = Lists.newArrayList(); + boolean isEquivalent = oldSchema.size() == newSchema.size(); + for (TableFieldSchema tableFieldSchema : oldSchema) { + @Nullable TableFieldSchema newTableFieldSchema = newSchemaMap.get(tableFieldSchema.getName()); + if (newTableFieldSchema == null) { + // We don't support deleting fields! + return Result.empty(); + } + TableFieldSchema.Builder updatedTableFieldSchema = newTableFieldSchema.toBuilder(); + updatedTableFieldSchema.clearFields(); + if (tableFieldSchema.getType().equals(TableFieldSchema.Type.STRUCT)) { + Result updatedTableFields = + getUpdatedSchema(tableFieldSchema.getFieldsList(), newTableFieldSchema.getFieldsList()); + if (!updatedTableFields.getFields().isPresent()) { + return updatedTableFields; + } + updatedTableFieldSchema.addAllFields(updatedTableFields.getFields().get()); + isEquivalent = isEquivalent && updatedTableFields.isEquivalent(); + isEquivalent = + isEquivalent + && tableFieldSchema + .toBuilder() + .clearFields() + .build() + .equals(newTableFieldSchema.toBuilder().clearFields().build()); + } else { + isEquivalent = isEquivalent && tableFieldSchema.equals(newTableFieldSchema); + } + updatedSchema.add(updatedTableFieldSchema.build()); + fieldNamesPopulated.add(updatedTableFieldSchema.getName()); + } + + // Add in new fields at the end of the schema. + newSchema.stream() + .filter(f -> !fieldNamesPopulated.contains(f.getName())) + .forEach(updatedSchema::add); + return Result.of(updatedSchema, isEquivalent); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 13a06b9a25a07..d0fddaf61a7c8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1985,9 +1985,13 @@ public void updateTableSchemaTest(boolean useSet) throws Exception { new TableSchema() .setFields( ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), new TableFieldSchema().setName("number").setType("INTEGER"), + new TableFieldSchema().setName("name").setType("STRING"), new TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED"))); + + // Add new fields to the update schema. Also reorder some existing fields to validate that we + // handle update + // field reordering correctly. TableSchema tableSchemaUpdated = new TableSchema() .setFields( @@ -2018,8 +2022,8 @@ public void updateTableSchemaTest(boolean useSet) throws Exception { new TableRow() .setF( ImmutableList.of( - new TableCell().setV("name" + i), new TableCell().setV(Long.toString(i)), + new TableCell().setV("name" + i), new TableCell().setV(i > 5 ? null : "foo"), new TableCell().setV(Long.toString(i * 2)))); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtilsTest.java new file mode 100644 index 0000000000000..bdbd2077999d6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaUpdateUtilsTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for the {@link TableSchemaUpdateUtils class}. */ +@RunWith(JUnit4.class) +public class TableSchemaUpdateUtilsTest { + @Test + public void testSchemaUpdate() { + TableSchema baseSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .build(); + TableSchema schema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(baseSchema.getFieldsList())) + .build(); + TableSchema topSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(schema.getFieldsList())) + .build(); + + TableSchema newBaseSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("d").setType(TableFieldSchema.Type.STRING)) + .build(); + TableSchema newSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(newBaseSchema.getFieldsList())) + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("d").setType(TableFieldSchema.Type.STRING)) + .build(); + + TableSchema newTopSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(newSchema.getFieldsList())) + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .build(); + + TableSchema expectedSchemaBaseSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("d").setType(TableFieldSchema.Type.STRING)) + .build(); + TableSchema expectedSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(expectedSchemaBaseSchema.getFieldsList())) + .addFields( + TableFieldSchema.newBuilder().setName("d").setType(TableFieldSchema.Type.STRING)) + .build(); + TableSchema expectedTopSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(expectedSchema.getFieldsList())) + .build(); + + TableSchema updatedTopSchema = + TableSchemaUpdateUtils.getUpdatedSchema(topSchema, newTopSchema).get(); + assertEquals(expectedTopSchema, updatedTopSchema); + } + + @Test + public void testEquivalentSchema() { + TableSchema baseSchema1 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .build(); + TableSchema schema1 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(baseSchema1.getFieldsList())) + .build(); + + TableSchema baseSchema2 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .build(); + TableSchema schema2 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(baseSchema2.getFieldsList())) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .build(); + assertFalse(TableSchemaUpdateUtils.getUpdatedSchema(schema1, schema2).isPresent()); + } + + @Test + public void testNonEquivalentSchema() { + TableSchema baseSchema1 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .build(); + TableSchema schema1 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(baseSchema1.getFieldsList())) + .build(); + TableSchema baseSchema2 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.INT64)) + .build(); + TableSchema schema2 = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder().setName("a").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("b").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder().setName("c").setType(TableFieldSchema.Type.STRING)) + .addFields( + TableFieldSchema.newBuilder() + .setName("nested") + .setType(TableFieldSchema.Type.STRUCT) + .addAllFields(baseSchema2.getFieldsList())) + .build(); + assertTrue(TableSchemaUpdateUtils.getUpdatedSchema(schema1, schema2).isPresent()); + } +}