Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass original message down through conversion for storage write api #31106

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

johnjcasey
Copy link
Contributor

@johnjcasey johnjcasey commented Apr 25, 2024

Enable users to specify an alternate way to generate the table row for the error output for BQIO's storage write api.

The user passes in a function of ElementT -> TableRow, and we maintain an index of the original elements passed in to BQIO. If the function exists, we use it to generate the error row, instead of the default behavior of emitting the failure directly.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @Abacn for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@johnjcasey johnjcasey marked this pull request as draft April 29, 2024 15:50
@johnjcasey johnjcasey marked this pull request as ready for review May 1, 2024 18:49
@johnjcasey
Copy link
Contributor Author

@Abacn @ahmedabu98 could you take a look at this?

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the change lgtm. Have one thing to confirm (cc'd below)

@@ -52,16 +52,18 @@
/** This {@link PTransform} manages loads into BigQuery using the Storage API. */
public class StorageApiLoads<DestinationT, ElementT>
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulConvertedRowsTag =
new TupleTag<>("successfulRows");
final TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it changed PTransform output element type. Do we need some change in BigQueryTranslation to preserve upgrade compatibility? cc: @chamikaramj

or is there plan to setup precommit test for bigquery pipeline upgrade? so tests can auto detect this (like kafka upgrade project)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory, its within the overall BQ transform, so it might work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing output element type and the coder here could break streaming update compatibility in general.

Have you considered using the updateCompatibilityVersion option ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require us to maintain two implementations of Streaming Inserts, one with this change, and one without, right? I think that would be prohibitive in general for beam IOs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called out in CHANGES.md if we have to do these breaking changes. But I recommend updateCompatibilityVersion here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can manually test by running a streaming job using the old version and running a replacement job that includes your changes with the --update option.

https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline#Launching

Dataflow streaming team might also have internal tests for update compatibility (but I haven't looked into this).

With updateCompatibilityVersion option you could fork based on the Beam version to preserve update compatibility. See following for an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a broader version on updateCompatibilyVersion, because this looks like we will, over time, start accruing legacy code based on specific beam versions. That strikes me as eventually unmaintainable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we will need to do this whenever we change the shape of a PCollection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one way to do that might be to use schema'd PCollections everywhere and evolve the schema when we want to do changes to the PCollection structure. But for the time being we have to fork the code to preserve compatibility. BTW not all structure changes will break compatibility so some changes can be done without forking (Dataflow link above has some details) but generally coder changes can break update compatibility (and probably Dataflow compatibility check will fail).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll make the forking change for this when I have a chance

@Abacn
Copy link
Contributor

Abacn commented May 2, 2024

Also going to run some load test to see if it has performance implications

update:

"AvgInputThroughputElementsPerSec": 51674.9203125,

identical to 2.55.0 (51205), 2.56.0 (47579)

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well. Just one suggestion for performance.

P.S. I see @Abacn's load test results, feel free to ignore

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants