Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Dataflow runner - Flatten() yields no output #31200

Closed
2 of 16 tasks
ndopj opened this issue May 6, 2024 · 3 comments
Closed
2 of 16 tasks

[Bug]: Dataflow runner - Flatten() yields no output #31200

ndopj opened this issue May 6, 2024 · 3 comments

Comments

@ndopj
Copy link

ndopj commented May 6, 2024

What happened?

I am unsure if this is the correct place to raise this issue. I am acting on the understanding that the Dataflow Runner source code is located in this repository. However, the problem might be more closely related to the behavior of the Google Cloud Dataflow runtime. If that's the case, I would appreciate it if someone could assist me in addressing this issue directly with Google Cloud.

I have an unbounded pipeline (Python SDK) that consumes data from Google Cloud Pub/Sub subscriptions and is deployed to Google Cloud Dataflow. This pipeline has three branches, which are eventually merged using the Flatten() function.

As illustrated in the image below, the pipeline has started successfully and is processing elements. However, two of the branches are actually empty and therefore are not providing any input to the Flatten/merge step. This behavior is intentional.

Screenshot 2024-05-06 at 22 56 41

With the current configuration, the Flatten/Merge step is not yielding any output, despite one of the branches continuously providing input to it. The details of the output collection can be observed in the image below:

Screenshot 2024-05-07 at 0 01 32

I would expect the Flatten/Merge step to yield at least elements of the branch Map BE events to Commands

I've also tried configuring windowing in each branch just before the Flatten/Merge step, setting it to fixed windows of 60 seconds with the following code. However, this didn't alter the behavior at all. I thought the main issue was the empty PCollection branches and that windowing would compel Flatten() to merge windows from each branch.

T = TypeVar('T')
def with_ts(value: T) -> TimestampedValue[T]:
    return TimestampedValue(value, int(time.time()))

.....
    | "Map WEB events to commands" >> beam.Map(lambda x: with_ts(web_event_to_commands(x)))
    | "Window WEB commands" >> beam.WindowInto(FixedWindows(60)))

I believe that according to the official documentations of Flatten():
https://beam.apache.org/documentation/transforms/python/other/flatten/
https://beam.apache.org/documentation/programming-guide/#flatten
the current behaviour is not expected and Flatten() should yield output elements from each input PCollection independently of other input PCollection's and windowing strategy (if such strategy is same in each input PCollection).

  • Apache Beam version: 2.55.1
  • Python version: 3.11
  • Dataflow Prime: Disabled
  • Runner v2: Enabled
  • Streaming: Engine Enabled
  • Vertical auto-scaling: Disabled
  • Streaming mode: Exactly once

Thanks in advance for any help in case this is not a bug but a mistake on my side.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@ndopj ndopj changed the title [Bug]: Dataflow runner Flatten() yields no output [Bug]: Dataflow runner - Flatten() yields no output May 6, 2024
@ndopj
Copy link
Author

ndopj commented May 7, 2024

When running the very same pipeline with direct runner on localhost, the Flatten/Merge step is behaving as expected. So the behaviour of Direct runner vs Dataflow runner is not consistent in regards to Flatten() transform.

@ndopj
Copy link
Author

ndopj commented May 7, 2024

So it seems this is only visual bug int the Google Cloud UI. Flatten/merge step is working correctly. The issue was that beam.LogElements() is not displaying any logs in the Dataflow for me for whatever reason (maybe misconfiguration of levels) and at the same time PCollection() details shown in the screenshots I've provided are displaying wrong metadata for Flatten/Merge step and each downstream steps from it.

I've switched the last step in my pipeline for different output collection and can successfully verify that processing of the elements works as intended.

@ndopj ndopj closed this as completed May 7, 2024
@github-actions github-actions bot added this to the 2.57.0 Release milestone May 7, 2024
@thecodemancer
Copy link

I was having the the issue:

DATAFLOW: 0 elements added to the output after Flatten

https://www.googlecloudcommunity.com/gc/Data-Analytics/DATAFLOW-0-elements-added-to-the-output-after-Flatten/m-p/598015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants