Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass original message down through conversion for storage write api #31106

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -3819,8 +3819,10 @@ private <DestinationT> WriteResult continueExpandTyped(
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<>(
destinationCoder,
elementCoder,
storageApiDynamicDestinations,
getRowMutationInformationFn(),
getFormatRecordOnFailureFunction(),
getCreateDisposition(),
getKmsKey(),
getStorageApiTriggeringFrequency(bqOptions),
Expand Down
Expand Up @@ -21,11 +21,14 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -36,11 +39,13 @@
* parameter controls how many rows are batched into a single ProtoRows object before we move on to
* the next one.
*/
class SplittingIterable implements Iterable<SplittingIterable.Value> {
class SplittingIterable<ElementT> implements Iterable<SplittingIterable.Value<ElementT>> {
@AutoValue
abstract static class Value {
abstract static class Value<ElementT> {
abstract ProtoRows getProtoRows();

abstract List<ElementT> getOriginalElements();

abstract List<Instant> getTimestamps();
}

Expand All @@ -49,7 +54,7 @@ ByteString convert(TableRow tableRow, boolean ignoreUnknownValues)
throws TableRowToStorageApiProto.SchemaConversionException;
}

private final Iterable<StorageApiWritePayload> underlying;
private final Iterable<KV<ElementT, StorageApiWritePayload>> underlying;
private final long splitSize;

private final ConvertUnknownFields unknownFieldsToMessage;
Expand All @@ -60,15 +65,18 @@ ByteString convert(TableRow tableRow, boolean ignoreUnknownValues)

private final Instant elementsTimestamp;

@Nullable SerializableFunction<ElementT, TableRow> formatRecordOnFailureFunction;

public SplittingIterable(
Iterable<StorageApiWritePayload> underlying,
Iterable<KV<ElementT, StorageApiWritePayload>> underlying,
long splitSize,
ConvertUnknownFields unknownFieldsToMessage,
Function<ByteString, TableRow> protoToTableRow,
BiConsumer<TimestampedValue<TableRow>, String> failedRowsConsumer,
boolean autoUpdateSchema,
boolean ignoreUnknownValues,
Instant elementsTimestamp) {
Instant elementsTimestamp,
@Nullable SerializableFunction<ElementT, TableRow> formatRecordOnFailureFunction) {
this.underlying = underlying;
this.splitSize = splitSize;
this.unknownFieldsToMessage = unknownFieldsToMessage;
Expand All @@ -77,29 +85,33 @@ public SplittingIterable(
this.autoUpdateSchema = autoUpdateSchema;
this.ignoreUnknownValues = ignoreUnknownValues;
this.elementsTimestamp = elementsTimestamp;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
}

@Override
public Iterator<Value> iterator() {
return new Iterator<Value>() {
final Iterator<StorageApiWritePayload> underlyingIterator = underlying.iterator();
public Iterator<Value<ElementT>> iterator() {
return new Iterator<Value<ElementT>>() {
final Iterator<KV<ElementT, StorageApiWritePayload>> underlyingIterator =
underlying.iterator();

@Override
public boolean hasNext() {
return underlyingIterator.hasNext();
}

@Override
public Value next() {
public Value<ElementT> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

List<Instant> timestamps = Lists.newArrayList();
ProtoRows.Builder inserts = ProtoRows.newBuilder();
List<ElementT> originalElements = new ArrayList<>();
long bytesSize = 0;
while (underlyingIterator.hasNext()) {
StorageApiWritePayload payload = underlyingIterator.next();
KV<ElementT, StorageApiWritePayload> underlyingKV = underlyingIterator.next();
StorageApiWritePayload payload = underlyingKV.getValue();
ByteString byteString = ByteString.copyFrom(payload.getPayload());
if (autoUpdateSchema) {
try {
Expand All @@ -116,7 +128,12 @@ public Value next() {
// This generally implies that ignoreUnknownValues=false and there were still
// unknown values here.
// Reconstitute the TableRow and send it to the failed-rows consumer.
TableRow tableRow = protoToTableRow.apply(byteString);
TableRow tableRow;
if (formatRecordOnFailureFunction != null) {
tableRow = formatRecordOnFailureFunction.apply(underlyingKV.getKey());
} else {
tableRow = protoToTableRow.apply(byteString);
}
// TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we
// only execute this
// codepath when ignoreUnknownFields==true, so we should never hit this codepath.
Expand All @@ -137,6 +154,7 @@ public Value next() {
}
}
inserts.addSerializedRows(byteString);
originalElements.add(underlyingKV.getKey());
Instant timestamp = payload.getTimestamp();
if (timestamp == null) {
timestamp = elementsTimestamp;
Expand All @@ -147,7 +165,8 @@ public Value next() {
break;
}
}
return new AutoValue_SplittingIterable_Value(inserts.build(), timestamps);
return new AutoValue_SplittingIterable_Value<ElementT>(
inserts.build(), originalElements, timestamps);
}
};
}
Expand Down
Expand Up @@ -50,22 +50,26 @@ public class StorageApiConvertMessages<DestinationT, ElementT>
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final BigQueryServices bqServices;
private final TupleTag<BigQueryStorageApiInsertError> failedWritesTag;
private final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulWritesTag;
private final TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>>
successfulWritesTag;
private final Coder<BigQueryStorageApiInsertError> errorCoder;
private final Coder<KV<DestinationT, StorageApiWritePayload>> successCoder;
private final Coder<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>> successCoder;

private final @Nullable SerializableFunction<ElementT, RowMutationInformation> rowMutationFn;
private final BadRecordRouter badRecordRouter;

private final @Nullable SerializableFunction<ElementT, TableRow> formatRecordOnFailureFunction;

public StorageApiConvertMessages(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices,
TupleTag<BigQueryStorageApiInsertError> failedWritesTag,
TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulWritesTag,
TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>> successfulWritesTag,
Coder<BigQueryStorageApiInsertError> errorCoder,
Coder<KV<DestinationT, StorageApiWritePayload>> successCoder,
Coder<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>> successCoder,
@Nullable SerializableFunction<ElementT, RowMutationInformation> rowMutationFn,
BadRecordRouter badRecordRouter) {
BadRecordRouter badRecordRouter,
@Nullable SerializableFunction<ElementT, TableRow> formatRecordOnFailureFunction) {
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.failedWritesTag = failedWritesTag;
Expand All @@ -74,6 +78,7 @@ public StorageApiConvertMessages(
this.successCoder = successCoder;
this.rowMutationFn = rowMutationFn;
this.badRecordRouter = badRecordRouter;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
}

@Override
Expand All @@ -92,7 +97,8 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, ElementT>> input) {
successfulWritesTag,
rowMutationFn,
badRecordRouter,
input.getCoder()))
input.getCoder(),
formatRecordOnFailureFunction))
.withOutputTags(
successfulWritesTag,
TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG)))
Expand All @@ -104,26 +110,31 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, ElementT>> input) {
}

public static class ConvertMessagesDoFn<DestinationT extends @NonNull Object, ElementT>
extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT, StorageApiWritePayload>> {
extends DoFn<
KV<DestinationT, ElementT>, KV<DestinationT, KV<ElementT, StorageApiWritePayload>>> {
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
private final BigQueryServices bqServices;
private final TupleTag<BigQueryStorageApiInsertError> failedWritesTag;
private final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulWritesTag;
private final TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>>
successfulWritesTag;
private final @Nullable SerializableFunction<ElementT, RowMutationInformation> rowMutationFn;
private final BadRecordRouter badRecordRouter;
Coder<KV<DestinationT, ElementT>> elementCoder;
private transient @Nullable DatasetService datasetServiceInternal = null;

private final @Nullable SerializableFunction<ElementT, TableRow> formatRecordOnFailureFunction;

ConvertMessagesDoFn(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices,
String operationName,
TupleTag<BigQueryStorageApiInsertError> failedWritesTag,
TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulWritesTag,
TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>> successfulWritesTag,
@Nullable SerializableFunction<ElementT, RowMutationInformation> rowMutationFn,
BadRecordRouter badRecordRouter,
Coder<KV<DestinationT, ElementT>> elementCoder) {
Coder<KV<DestinationT, ElementT>> elementCoder,
@Nullable SerializableFunction<ElementT, TableRow> formatRecordOnFailureFunction) {
this.dynamicDestinations = dynamicDestinations;
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.bqServices = bqServices;
Expand All @@ -132,6 +143,7 @@ public static class ConvertMessagesDoFn<DestinationT extends @NonNull Object, El
this.rowMutationFn = rowMutationFn;
this.badRecordRouter = badRecordRouter;
this.elementCoder = elementCoder;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
}

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
Expand Down Expand Up @@ -177,11 +189,16 @@ public void processElement(
messageConverter
.toMessage(element.getValue(), rowMutationInformation)
.withTimestamp(timestamp);
o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
o.get(successfulWritesTag)
.output(KV.of(element.getKey(), KV.of(element.getValue(), payload)));
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
} catch (TableRowToStorageApiProto.SchemaConversionException conversionException) {
TableRow tableRow;
try {
tableRow = messageConverter.toTableRow(element.getValue());
if (formatRecordOnFailureFunction != null) {
tableRow = formatRecordOnFailureFunction.apply(element.getValue());
} else {
tableRow = messageConverter.toTableRow(element.getValue());
}
} catch (Exception e) {
badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow");
return;
Expand Down