Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Reshuffle.viaRandomKey timeout since version 2.54.0 #31095

Open
1 of 16 tasks
djaneluz opened this issue Apr 24, 2024 · 26 comments
Open
1 of 16 tasks

[Bug]: Reshuffle.viaRandomKey timeout since version 2.54.0 #31095

djaneluz opened this issue Apr 24, 2024 · 26 comments
Assignees

Comments

@djaneluz
Copy link

djaneluz commented Apr 24, 2024

What happened?

I have a Batch pipeline that reads data from Firestore and writes it to BigQuery

    pipeline
        .apply(Create.of("myCollection"))
        .apply("QueryRequest", ParDo.of(new QueryRequestFn(projectId, firestoreDatabaseId))) // Creates a RunQueryRequest
        .apply("ReadFromFirestore", FirestoreIO.v1().read()
            .runQuery()
            .withRpcQosOptions(rpcQosOptions)
            .build())
        .apply("WriteToBigQuery", writeToBigQuery(projectId, firestoreStateDataset));

The pipeline was working with Apache Beam Version 2.52.0.

When I updated to 2.55.1 and it failed, I tested with 2.55.0 and 2.54.0 and it also failed, it only worked back with 2.53.0.

I fails when reading from Firestore, this PTransform internally uses Reshuffle.viaRandomKey() and it's where it fails.

The error message is not very informative, it contains only timeout errors:

Workflow failed. Causes: S24:ReadPromFirestore/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/Read+ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/GroupByWindow+ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable/ParMultiDo(Anonymous)+ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)+ReadFromFirestore/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)+WriteToBigQuery/WriteProfilesToBigQuery/PrepareWrite/ParDo(Anonymous)/ParMultiDo(Anonymous)+WriteoBigQuery/WriteProfilesToBigQuery/BatchLoads/rewindowIntoGlobal/Window.Assign+WriteToBigQuery/WriteToBigQuery/BatchLoads/WriteBundlesToFiles+WriteToBigQuery/WriteToBigQuery/BatchLoads/ReifyResults/View.AsIterable/MapElements/Map/ParMultiDo(Anonymous)+WriteToBigQuery/WriteToBigQuery/BatchLoads/GroupByDestination/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. If the logs only contain generic timeout errors related to accessing external resources, such as MongoDB, verify that the worker service account has permission to access the resource's subnetwork. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: 

      Root cause: Timed out waiting for an update from the worker. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact.
      Worker ID: builder-tes-04240840-lbhk-harness-kshq,

      Root cause: Timed out waiting for an update from the worker. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact.
      Worker ID: builder-tes-04240840-lbhk-harness-m8ht,

      Root cause: Timed out waiting for an update from the worker. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact.
      Worker ID: builder-tes-04240840-lbhk-harness-r37n,

      Root cause: Timed out waiting for an update from the worker. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact.
      Worker ID: builder-tes-04240840-lbhk-harness-ht76

I noticed the implementation of the Reshuffle changed

Graph using version 2.54.0

image

Graph using version 2.53.0

image

This is the commit that changed the Reshuffle implementation from @kennknowles

Could someone investigate or give me some ideas of what might be the problem?

Thanks

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Apr 25, 2024

Likely due to #28853

Could you please open a support ticket so the support can take a look of your job

@Abacn
Copy link
Contributor

Abacn commented Apr 25, 2024

another change is that since Beam 2.54.0 batch job is using Dataflow runner v2 by default. Was your 2.53.0 job show "runner v2: disabled" and 2.54.0 "runner v2: enabled" in the Dataflow UI?

@djaneluz
Copy link
Author

Hello @Abacn , thank you for the reply.

Yes, the pipeline with 2.54.0 shows "runner v2: enabled" and the one with 2.53.0 shows "runner v2: disabled"

@djaneluz
Copy link
Author

I'm not sure how can I open a support ticket

@liferoad
Copy link
Collaborator

@kennknowles FYI.

For the support ticket, please check https://cloud.google.com/dataflow/docs/support/getting-support

For 2.54.0, can you try to disable Runner V2 (--experiments=disable_runner_v2)?

@kennknowles
Copy link
Member

Thanks for reporting this. Those four error messages are from the service's point of view (after 4 it fails a batch job). I wonder if we see a crash loop or some such thing on the worker.

@priyansndesai
Copy link

Is it possible for you to run your job on 2.53.0 with Runner V2 enabled?

@kennknowles
Copy link
Member

So to summarize, we could check these configurations:

  • Old reshuffle, v1: known good
  • Old reshuffle, v2: test this with 2.53.0 and --experiments=use_runner_v2
  • New reshuffle, v1: test this with 2.54.0 and --experiments=disable_runner_v2
  • New reshuffle, v2: known bad

If the problem is with "new reshuffle" then we in fact have a flag that will allow you to use the old expansion while still upgrading to Beam 2.54.0: --updateCompatibilityVersion=2.53.0. This is not specifically meant for this purpose (it is aimed at streaming update compatibility) and can impact other transforms as well. Also note that the reason for the change was that the old reshuffle yielded incorrect results in pipelines that use triggers; not an issue for batch.

@djaneluz
Copy link
Author

For 2.54.0, can you try to disable Runner V2 (--experiments=disable_runner_v2)?

I ran the pipeline with version 2.54.0 disabling the Runner V2, as suggested, and it worked.

@djaneluz
Copy link
Author

Is it possible for you to run your job on 2.53.0 with Runner V2 enabled?

I also ran the pipeline with version 2.53.0 with the Runner V2 enabled, and it failed!

@djaneluz
Copy link
Author

So apparently the problem is the Runner V2 and not the Reshuffle, am I right?

@djaneluz
Copy link
Author

Those four error messages are from the service's point of view (after 4 it fails a batch job). I wonder if we see a crash loop or some such thing on the worker.

I don't see any error or warning in the workers logs

We can see the in throughput from the GroupByKey the 4 tentatives:

image

@kennknowles
Copy link
Member

Alright, thank you so much for isolating this! We would love to be able to minimally reproduce it and fix it.

From the lack of error, it may be a pathological performance problem causing the work item to timeout. Can you share the size of elements and the overall size of data shuffled? (I don't think any other factors could impact this transform)

@kennknowles
Copy link
Member

One thing occurred to me: the entire stage after the shuffle failed. From the error message, this stage includes a fusion of many steps that are not part of the reshuffle. The issue could be in the subsequent steps as well, or an interaction between them.

Specifically, the step name ReadPromFirestore/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/Read+ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/GroupByWindow+ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable/ParMultiDo(Anonymous)+ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)+ReadFromFirestore/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)+WriteToBigQuery/WriteProfilesToBigQuery/PrepareWrite/ParDo(Anonymous)/ParMultiDo(Anonymous)+WriteoBigQuery/WriteProfilesToBigQuery/BatchLoads/rewindowIntoGlobal/Window.Assign+WriteToBigQuery/WriteToBigQuery/BatchLoads/WriteBundlesToFiles+WriteToBigQuery/WriteToBigQuery/BatchLoads/ReifyResults/View.AsIterable/MapElements/Map/ParMultiDo(Anonymous)+WriteToBigQuery/WriteToBigQuery/BatchLoads/GroupByDestination/Write

This is the fusion of the steps:

  • ReadPromFirestore/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/Read (from shuffle)
  • ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/GroupByWindow
  • ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable/ParMultiDo(Anonymous)
  • ReadFromFirestore/Reshuffle.ViaRandomKey/Reshuffle/RestoreMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)
  • ReadFromFirestore/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
  • WriteToBigQuery/WriteProfilesToBigQuery/PrepareWrite/ParDo(Anonymous)/ParMultiDo(Anonymous)
  • WriteToBigQuery/WriteProfilesToBigQuery/BatchLoads/rewindowIntoGlobal/Window.Assign
  • WriteToBigQuery/WriteToBigQuery/BatchLoads/WriteBundlesToFiles
  • WriteToBigQuery/WriteToBigQuery/BatchLoads/ReifyResults/View.AsIterable/MapElements/Map/ParMultiDo(Anonymous)
  • WriteToBigQuery/WriteToBigQuery/BatchLoads/GroupByDestination/Write

@djaneluz
Copy link
Author

Can you share the size of elements and the overall size of data shuffled? (I don't think any other factors could impact this transform)

The input of the Reshuffle.viaRandomKey step has the following information:

  • Elements added 16,885,482
  • Estimated size 2.74 GB

@djaneluz
Copy link
Author

@kennknowles so maybe the problem might be on BigQueryIO write?

This step currently is:

    return response
        .apply("WriteToBigQuery", BigQueryIO.<RunQueryResponse>write()
            .to(tableSpec)
            .optimizedWrites()
            .withFormatFunction(new UserRunQueryResponseToTableRow())
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

@kennknowles
Copy link
Member

Easy to test: you can comment out the BQ write and see if you still reproduce the error.

@kennknowles
Copy link
Member

Through Google's Cloud Support channels you can get someone who can really dig in to the details of the logs and the metrics. But from an outside perspective, doing some trial and error to see if this reproduces with different things removed from the pipeline and different data sizes could be productive.

My thoughts after your last comment:

  • that isn't very much data, so maybe it would reproduce with even very small data
  • definitely I would try removing the BQ write
  • I still don't know why the worker is losing contact with the service; even if it were super slow it should continue to send progress updates that automatically renew the work lease and would not cause error

@djaneluz
Copy link
Author

djaneluz commented May 6, 2024

I did like you suggested and isolated the pipeline, removing the step to Write to BigQuery, using Beam Version 2.55.1, and the pipeline succeeded!

@kennknowles
Copy link
Member

And just to check - it succeeded on runner v2? So then it could be the BigQuery write or it could be an interaction of the fused steps.

@kennknowles
Copy link
Member

git log --oneline ^v2.54.0 v2.55.0 -- sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/

yields many changes

5d89a331647 Fixes an upgrade imcompatiblity of BQ read/write transforms (#30562)
b6301b52058 fix: support reading arrays of structs from bigquery with schemas (#30448)
fa43f82f8a2 Disable BigQueryStorageStreamSource.splitAtFraction when read api v2 used (#30443)
6a03f9ba062 Update triggering frequency doc (#30457)
549faba9b98 Roll forward "Read API Source v2 (#25392)" fix data loss (#28778)
6a8c27ef6fb Feature/add error handling for bqio (#30081)
53c966dfd9e Merge pull request #29924: Merge runners-core-construction into sdks-java-core
f73ce82fb37 Merge runners-core-construction into sdks-java-core
f64501427bf Extract ThrottleTimeCounter for PerWorkerMetrics (#30245)
c006c5e2d53 Add Default method for OutputReceiver.outputWindowedValue (#30220)
1d9f6044c76 Remove constraint introduced in #30059 (#30186)
30917ff7b3e Don't use '-' as a delimiter in BigQuerySinkMetrics (#30118)
22fefebacab BigQuery: Decouple clustering from time partitioning when writing (#30094)
8485ab40e12 Allow JSON type in TableSchema for BigQuery FILE_LOAD (#29923)

@priyansndesai
Copy link

Did removing the BigQuery write step for Runner V2 work for older Beam versions than 2.55.1?

@djaneluz
Copy link
Author

djaneluz commented May 7, 2024

And just to check - it succeeded on runner v2? So then it could be the BigQuery write or it could be an interaction of the fused steps.

Yes, with Runner V2

@priyansndesai
Copy link

Did removing the BigQuery write step for Runner V2 work for older Beam versions than 2.55.1?

This is to identify if the change was on Beam side or Dataflow Runner V2 side

@djaneluz
Copy link
Author

djaneluz commented May 7, 2024

Did removing the BigQuery write step for Runner V2 work for older Beam versions than 2.55.1?

Yes, I tested with version 2.54.0 with Runner V2 and it worked

@priyansndesai
Copy link

Is it possible for you to share the job IDs or create a dataflow support ticket so that we can examine this more closely?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants