New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass original message down through conversion for storage write api #31106
base: master
Are you sure you want to change the base?
Pass original message down through conversion for storage write api #31106
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@Abacn @ahmedabu98 could you take a look at this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, the change lgtm. Have one thing to confirm (cc'd below)
...d-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java
Outdated
Show resolved
Hide resolved
@@ -52,16 +52,18 @@ | |||
/** This {@link PTransform} manages loads into BigQuery using the Storage API. */ | |||
public class StorageApiLoads<DestinationT, ElementT> | |||
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> { | |||
final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulConvertedRowsTag = | |||
new TupleTag<>("successfulRows"); | |||
final TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it changed PTransform output element type. Do we need some change in BigQueryTranslation to preserve upgrade compatibility? cc: @chamikaramj
or is there plan to setup precommit test for bigquery pipeline upgrade? so tests can auto detect this (like kafka upgrade project)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in theory, its within the overall BQ transform, so it might work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think changing output element type and the coder here could break streaming update compatibility in general.
Have you considered using the updateCompatibilityVersion
option ?
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java
Line 45 in c531f89
String getUpdateCompatibilityVersion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would require us to maintain two implementations of Streaming Inserts, one with this change, and one without, right? I think that would be prohibitive in general for beam IOs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be called out in CHANGES.md if we have to do these breaking changes. But I recommend updateCompatibilityVersion
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can manually test by running a streaming job using the old version and running a replacement job that includes your changes with the --update
option.
https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline#Launching
Dataflow streaming team might also have internal tests for update compatibility (but I haven't looked into this).
With updateCompatibilityVersion
option you could fork based on the Beam version to preserve update compatibility. See following for an example.
String requestedVersionString = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a broader version on updateCompatibilyVersion, because this looks like we will, over time, start accruing legacy code based on specific beam versions. That strikes me as eventually unmaintainable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we will need to do this whenever we change the shape of a PCollection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one way to do that might be to use schema'd PCollection
s everywhere and evolve the schema when we want to do changes to the PCollection
structure. But for the time being we have to fork the code to preserve compatibility. BTW not all structure changes will break compatibility so some changes can be done without forking (Dataflow link above has some details) but generally coder changes can break update compatibility (and probably Dataflow compatibility check will fail).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'll make the forking change for this when I have a chance
Also going to run some load test to see if it has performance implications update:
identical to 2.55.0 (51205), 2.56.0 (47579) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well. Just one suggestion for performance.
P.S. I see @Abacn's load test results, feel free to ignore
Enable users to specify an alternate way to generate the table row for the error output for BQIO's storage write api.
The user passes in a function of ElementT -> TableRow, and we maintain an index of the original elements passed in to BQIO. If the function exists, we use it to generate the error row, instead of the default behavior of emitting the failure directly.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.