New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimise View.asList() side inputs for iterating rather than for indexing. #31087
Conversation
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
…xing. The current implementation is, essentially, a distributed hashmap from integer keys to the list contents, mediated by each upstream worker starting at a random value to minimize overlaps and emitting sufficient metadata to map this onto the contiguous range [0, N). This provides optimal *random-access* performance, but very poor *iteration* performance (essentially having to do a key lookup for every advance, and as the keys are hashed and distributed rather than clustered numerically, there is little to no amortiziation in these lookups for adjacent items. Given that most uses for List side inpupts are merely to gather a collection of values (the user has no control over the ordering when materialized) and the high costs of providing random access, this is probably the wrong tradeoff for most pipelines. This is an update-incompatable change and so has been guarded by the update compatibility version flag. The old behavior can be explicilty asked for via a new AsList#withRandomAccess() method.
d0a859f
to
b163a54
Compare
Totally agree. I do know that this was actually an explicit decision. The history as I understand it:
TBH I would be perfectly happy if we had never allowed random access for list side inputs, leaving that to map side inputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this make it identical to View.asIterable
?
Is it possible to retain it as a window --> iterable map for efficient access by window? (I don't know if that is already implied by how this interacts with code elsewhere or what)
* | ||
* <p>For internal use only. | ||
*/ | ||
public static class ListViewFn3<T> extends ViewFn<IterableView<T>, List<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love that these are just named 1, 2, 3...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is equivalent to View.asIterable() plus implementing the List methods so nothing breaks. Good question about the Window -> Iterable map; this is handled at a lower level, but I don't know all the details there (though in that case I can see that constructing the mapping would be more worthwhile). In the interest of being conservative while capturing the most important gains I'll restrict this to the global window case.
(I kept the name for ListViewFn2 just in case there are pipelines serializing it as data.)
This seems a likely culprit for the failure at https://ge.apache.org/s/6b22rnlopcdxk/tests/overview?outcome=FAILED |
I think you're right. This was masked by
|
|
This also breaks several Java PostCommit: https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_DataflowV1.yml?query=event%3Aschedule faling tests:
same failure seen in https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml?query=event%3Aschedule |
…for indexing. (apache#31087)" This reverts commit 7f7bc3e.
Hopefully #31163 should fix it. Otherwise we can submit the rollback. |
The current implementation is, essentially, a distributed hashmap from integer keys to the list contents, mediated by each upstream worker starting at a random value to minimize overlaps and emitting sufficient metadata to map this onto the contiguous range [0, N). This provides optimal random-access performance, but very poor iteration performance (essentially having to do a key lookup for every advance, and as the keys are hashed and distributed rather than clustered numerically, there is little to no amortiziation in these lookups for adjacent items.
Given that most uses for List side inpupts are merely to gather a collection of values (the user has no control over the ordering when materialized) and the high costs of providing random access, this is probably the wrong tradeoff for most pipelines.
This is an update-incompatable change and so has been guarded by the update compatibility version flag. The old behavior can be explicilty asked for via a new AsList#withRandomAccess() method.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.