Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise View.asList() side inputs for iterating rather than for indexing. #31087

Merged
merged 9 commits into from Apr 30, 2024

Conversation

robertwb
Copy link
Contributor

The current implementation is, essentially, a distributed hashmap from integer keys to the list contents, mediated by each upstream worker starting at a random value to minimize overlaps and emitting sufficient metadata to map this onto the contiguous range [0, N). This provides optimal random-access performance, but very poor iteration performance (essentially having to do a key lookup for every advance, and as the keys are hashed and distributed rather than clustered numerically, there is little to no amortiziation in these lookups for adjacent items.

Given that most uses for List side inpupts are merely to gather a collection of values (the user has no control over the ordering when materialized) and the high costs of providing random access, this is probably the wrong tradeoff for most pipelines.

This is an update-incompatable change and so has been guarded by the update compatibility version flag. The old behavior can be explicilty asked for via a new AsList#withRandomAccess() method.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions github-actions bot added the java label Apr 23, 2024
@robertwb
Copy link
Contributor Author

R: @kennknowles @damccorm

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

…xing.

The current implementation is, essentially, a distributed hashmap from
integer keys to the list contents, mediated by each upstream worker starting
at a random value to minimize overlaps and emitting sufficient metadata to map
this onto the contiguous range [0, N). This provides optimal *random-access*
performance, but very poor *iteration* performance (essentially having to do
a key lookup for every advance, and as the keys are hashed and distributed
rather than clustered numerically, there is little to no amortiziation in these
lookups for adjacent items.

Given that most uses for List side inpupts are merely to gather a collection
of values (the user has no control over the ordering when materialized) and
the high costs of providing random access, this is probably the wrong tradeoff
for most pipelines.

This is an update-incompatable change and so has been guarded by the
update compatibility version flag. The old behavior can be explicilty
asked for via a new AsList#withRandomAccess() method.
@kennknowles
Copy link
Member

Totally agree. I do know that this was actually an explicit decision. The history as I understand it:

  • We already had View.asIterable that was a simple iterator, but windowed side inputs had awful performance because it was just a filter on the whole side input
  • We added View.asList primarily as an indicator that the per-window value could be cached in memory after the first read.
  • We added ISM format and for some reason emphasized the random access behavior of the Java List class.

TBH I would be perfectly happy if we had never allowed random access for list side inputs, leaving that to map side inputs.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make it identical to View.asIterable?

Is it possible to retain it as a window --> iterable map for efficient access by window? (I don't know if that is already implied by how this interacts with code elsewhere or what)

*
* <p>For internal use only.
*/
public static class ListViewFn3<T> extends ViewFn<IterableView<T>, List<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that these are just named 1, 2, 3...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is equivalent to View.asIterable() plus implementing the List methods so nothing breaks. Good question about the Window -> Iterable map; this is handled at a lower level, but I don't know all the details there (though in that case I can see that constructing the mapping would be more worthwhile). In the interest of being conservative while capturing the most important gains I'll restrict this to the global window case.

(I kept the name for ListViewFn2 just in case there are pipelines serializing it as data.)
@robertwb robertwb merged commit 7f7bc3e into apache:master Apr 30, 2024
28 of 30 checks passed
@kennknowles
Copy link
Member

This seems a likely culprit for the failure at https://ge.apache.org/s/6b22rnlopcdxk/tests/overview?outcome=FAILED

@robertwb
Copy link
Contributor Author

robertwb commented May 1, 2024

I think you're right. This was masked by

/Users/robertwb/Work/beam/incubator-beam/sdks/java/extensions/avro/build/generated-test-avro-java/org/apache/beam/sdk/extensions/avro/schemas/TestAvro.java:135: error: cannot find symbol
  protected static final org.apache.avro.data.TimeConversions.TimestampConversion TIMESTAMP_CONVERSION = new org.apache.avro.data.TimeConversions.TimestampConversion();

@robertwb
Copy link
Contributor Author

robertwb commented May 1, 2024

This seems a likely culprit for the failure at https://ge.apache.org/s/6b22rnlopcdxk/tests/overview?outcome=FAILED

#31149

@Abacn
Copy link
Contributor

Abacn commented May 2, 2024

This also breaks several Java PostCommit: https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_DataflowV1.yml?query=event%3Aschedule

faling tests:

testSequentialWrite (org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT) failed

java.lang.NullPointerException: Unknown producer for value SimplePCollectionView{tag=Tag<org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:1443#e1789f47d74ca86c>, viewFn=org.apache.beam.sdk.values.PCollectionViews$IterableBackedListViewFn@b96f9be, coder=VoidCoder, windowMappingFn=GlobalWindowMappingFn{}, pCollection=wait/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap/ParMultiDo(Anonymous).output [PCollection@1669047811]} while translating step wait/Wait/Map
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1269)
	at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.getProducer(DataflowPipelineTranslator.java:571)
testFhirIO_Import[DSTU2] (org.apache.beam.sdk.io.gcp.healthcare.FhirIOWriteIT) failed
testFhirIO_Import[STU3] (org.apache.beam.sdk.io.gcp.healthcare.FhirIOWriteIT) failed

java.lang.NullPointerException: Unknown producer for value SimplePCollectionView{tag=Tag<org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:1443#53e357e8e9199108>, viewFn=org.apache.beam.sdk.values.PCollectionViews$IterableBackedListViewFn@7bc0659a, coder=VoidCoder, windowMappingFn=GlobalWindowMappingFn{}, pCollection=FhirIO.Write/FhirIO.Import/Wait On File Writing/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap/ParMultiDo(Anonymous).output [PCollection@876681835]} while translating step FhirIO.Write/FhirIO.Import/Wait On File Writing/Wait/Map
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1269)
	at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.getProducer(DataflowPipelineTranslator.java:569)
	at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSideInputs(DataflowPipelineTranslator.java:1205)

same failure seen in https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow.yml?query=event%3Aschedule

robertwb added a commit to robertwb/incubator-beam that referenced this pull request May 2, 2024
@robertwb
Copy link
Contributor Author

robertwb commented May 2, 2024

Hopefully #31163 should fix it. Otherwise we can submit the rollback.

@robertwb robertwb mentioned this pull request May 7, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants