Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new StateBaseAsyncDoFn with State and Timers #5055

Open
albertols opened this issue Nov 8, 2023 · 6 comments
Open

new StateBaseAsyncDoFn with State and Timers #5055

albertols opened this issue Nov 8, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@albertols
Copy link

albertols commented Nov 8, 2023

Feature Request, motivated by BaseAsyncDoFn and KV lookups to avoid duplicates (instead of using Redis, BigTable, Hazelcast IMDG, etc)
https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java

  1. By using State and Timers we could prevent processElement from being called: https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java#L78
  2. Abstracting https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java#L94C1-L95C1

protected abstract boolean alreadySent(@StateId("buffer") MapState<InputT, OutputT> buffer, InputT element);

and this should be also abstract: https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java#L166

  1. and keeping the State somehow through:

protected abstract void addIdempotentElementInBuffer(MapState<InputT, OutputT> buffer, InputT input, OutputT output);

NOTE: GlobalWindow is needed (before applying applyTransform(ParDo.of(new StateAsyncParDoWithAkka(mediationConfig))).map { m =>), in order to keepthe state amomng processElements

Happy to hear other alternatives, we are trying to cut IMDG and extenral idempotent lookups? it would be really cool 👍🏼

Thanks SCio Team!!

@kellen
Copy link
Contributor

kellen commented Nov 8, 2023

IMDG = ?

BaseAsyncLookupDoFn has a type parameter for some cache implementation, would that suffice?

@albertols
Copy link
Author

IMDG = ?

BaseAsyncLookupDoFn has a type parameter for some cache implementation, would that suffice?

Thanks @kellen, I was checking it out too ;) , really interesting indeed, what I meant by IMDG, it is that we are using a Hazelcast cluster on GKE (with some mircoservices), and now prototyping in SCIO a similar data processing app.

I am going through https://github.com/spotify/scio/blob/0eece133a773e7ff85fc9e7fdcad1bfc3593fc1d/scio-test/src/test/scala/com/spotify/scio/transforms/AsyncLookupDoFnTest.scala#L273C33-L273C46, based on BaseAsyncLookupDoFn

I am trying to figure out an implementation: I do not see many examples but

TransformOverride.ofKV[Int, BaseAsyncLookupDoFn.Try[String]](
, the only concern here would be the sacalabilty, I guess the DataFlow Vertical autoscaling might help.

I was also wondering TTL for each Element, (there would be some uses cases when eviction comes about could be released, e.g: PubSub; with Timers and @ontime we could have some interesting control / output for each Element.

Cheers

@kellen
Copy link
Contributor

kellen commented Nov 8, 2023

I still don't think I know what "IMDG" means here.

You are correct that BaseAsyncLookupDoFn could use a better example. You can plug in whatever cache supplier you want, e.g. a com.google.common.cache .Cache, and have that handle TTL etc

@RustedBones
Copy link
Contributor

IMDG stands for In-Memory Data Grid

@albertols albertols changed the title new StateStateBaseAsyncDoFn with State and Timers new StateBaseAsyncDoFn with State and Timers Nov 10, 2023
@albertols
Copy link
Author

albertols commented Nov 14, 2023

Hi Guys!

I was able to have a functional State & Timer, for duplicates control; we could have "in memory/or with a low latency" duplicate checks of +1M events (just when using GlobalWindows for PubSub events, otherwise I have not been able to keep state among calls with FixedWindows and no Triggers):

  1. Call: https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/SCIOAsyncService.scala#L65
  2. Proposal of StateBaseAsyncDoFn https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java
  3. implementaiton of abstract methods https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/StateAsyncParDoWithAkka.scala
  • settingElementTTLTimer
  • addIdempotentElementInBuffer
  • alreadySent

thus, we would have not to deal with a cache supplier (+1M it could lead to memory issues) and it could be easier to scale (Vertical autoscale it is only available in DataFlow Prime)

Thanks for reviewing!

P.S.1: OK duplicates flow
image

P.S.2: I am curently testing in DataFlow with multiple workers

P.S.3: I am facing some race conditions when records are mocked at the same time (e.g: scio-8000000246 and beam-8000000246 ):

image

it could be elminated with distinct here
distinctBykeyUrl
but Trigger must be applied: https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/SCIOAsyncService.scala#L30

@RustedBones RustedBones added the enhancement New feature or request label Dec 1, 2023
@alberto-lopez-db
Copy link

regarding this potential enhancement S & T for BaseAsyncDoFn, this has been published last week https://medium.com/@serna.alberto.eng/avoid-http-requests-duplicates-in-apache-beam-with-scio-a-custom-baseasyncdofn-and-state-and-2c7d63059ab3

hope it helps @kellen , @RustedBones

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants