Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Watermarks and Windowing Not Working with FlinkRunner and KinesisIO Read Transform #31085

Open
2 of 16 tasks
akashk99 opened this issue Apr 23, 2024 · 12 comments
Open
2 of 16 tasks

Comments

@akashk99
Copy link

What happened?

When there are idle subtasks in flink, they dont propagate watermarks to downstream operators and thus windowing function that are based on watermarks never get triggered. I can see that when setting parallelism exactly equal to the number of kinesis shards, the problem doesnt exists, however, if this number is different, then I see the flink UI showing no watermarks and my windows never get triggered.

I also have custom DoFns that output with timestamp before so in theory, that should be used as the watermark for windowing, however, this is not the case.

When using native flink, I have seen solutions such as using methods like "withIdlenss", but these dont exist in beam. Is there something I am missing in my kinesis config or is this a known issue with the read transform,

This only occurs on the flink runner and not the direct or dataflow runner. Its also possible this isnt an issue with the kinesis io reader, but maybe the windowing function should ignore watermarks from idle upstream tasks.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@je-ik
Copy link
Contributor

je-ik commented May 7, 2024

This is probably related to #29816 which was fixed in #30969. Can you verify this on 2.56.0?

@akashk99
Copy link
Author

akashk99 commented May 7, 2024

@je-ik Hi, thanks for the tip, but just tried upgrading to 2.56 and still seeing the same error. I am able to get it to work if I set my parallelism to 2 but any other values wont work which poses issues with autoscaling on aws. I also notice that the flink UI is showing me this:

No Watermark (Watermarks are only available if EventTime is used)

Im seeing this on the watermarks tab for each subtask. Any advice you could give would be very helpful

@je-ik
Copy link
Contributor

je-ik commented May 7, 2024

Can you provide all the command-line flags, which you pass to the runner, please?

@akashk99
Copy link
Author

akashk99 commented May 7, 2024

I am running it through aws managed flink so kind of a black box there, however, the only pipeline option I am passing is
--runner=FlinkRunner in addition to my application specific options.

After reading the linked issue, I was able to get it to work locally using beam_fn_api experiments + upgrading to 2.56, but not really sure what thats doing.

I also noticed that this expirement is adding a bunch of operators and is resulting in higher backpressure and lower performance which means its most likely not a viable solution

@je-ik
Copy link
Contributor

je-ik commented May 7, 2024

Strange. Seems loke the fix is not working in your case. Can you double-check that you run with 2.56.0 (e.g. that no dependency brings some older Beam version, shading, etc.). Other than that it might help to set lpg level to DEBUG and investigate logs around FlinkSourceSplit and SplitEnumerator.

@akashk99
Copy link
Author

akashk99 commented May 7, 2024

will take a look to ensure no earlier version is being brought in. I was seeing this log:

May 07, 2024 9:51:04 AM org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase notifyNoMoreSplits INFO: Received NoMoreSplits signal from enumerator.

also just reran and saw this INFO: Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@6de059f3, splitState.isNull=true, checkpointMark=null}]

Does this give any indication into the issue?

@je-ik
Copy link
Contributor

je-ik commented May 7, 2024

Not really, but it seems you run the correct 2.56.0 version. The noMoreSplits signal just tells that there is indeed no more work. However that should result in emission of final watermark and should not hold the watermark. Could you patch your Beam version to add more logs? Ideally where the reader emits/computes watermark - e.g.

@akashk99
Copy link
Author

akashk99 commented May 7, 2024

I am actually seeing the watermarks work now in the flink runner on the web UI. And also seeing the idle tasks from my source reader get finished which I believe is ideal. However, I am still not getting the logs that occur when my window gets triggered unless beam_fn_api is enabled. Is there something else I need to be doing to get the window to trigger? This works without issue in dataflow and directrunner

@je-ik
Copy link
Contributor

je-ik commented May 7, 2024

I am actually seeing the watermarks work now in the flink runner on the web UI. And also seeing the idle tasks from my source reader get finished which I believe is ideal.

Yes, that is how the fix should work.

However, I am still not getting the logs that occur when my window gets triggered unless beam_fn_api is enabled. Is there something else I need to be doing to get the window to trigger?

Can you try setting autoWatermarkInterval?

@akashk99
Copy link
Author

akashk99 commented May 7, 2024

where is autoWatermarkInterval set in beam? Is this a pipeline option or set in the kinesis reader somewhere?

@je-ik
Copy link
Contributor

je-ik commented May 7, 2024

Pipeline option. E.g. --autoWatermarkInterval=100

@akashk99
Copy link
Author

akashk99 commented May 7, 2024

that worked, thank you so much for your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants