Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] How to create accumulated batch for processing? #17

Open
ilyanoskov opened this issue Mar 16, 2024 · 8 comments
Open

[QUESTION] How to create accumulated batch for processing? #17

ilyanoskov opened this issue Mar 16, 2024 · 8 comments
Labels
question Further information is requested

Comments

@ilyanoskov
Copy link

ilyanoskov commented Mar 16, 2024

Hello,

I have a use-case where I need to process 10 million rows. First, I want to process 1M rows when they arrive, then I want to process 2M rows (1M previous + 1M new), then 3M rows, and so on in that order. How can I do it with Pathway?

I was able to do something like this with AsyncTransformer and accumulating the rows inside of it, but I find this solution clunky and perhaps there is more Pathway-like approach I could take here?

Thank you very much in advance for your help

@ilyanoskov ilyanoskov added the question Further information is requested label Mar 16, 2024
@dxtrous
Copy link
Member

dxtrous commented Mar 16, 2024

Hey Ilia, Since you ask about the Pathway-like approach, the "mental model" of Pathway computations is that of reactive computation. The best known tool with a similar model is a spreadsheet (Excel) so I will use this analogy to answer.

In Excel, you would probably paste in the rows of your table into the spreadsheet, one under another. Excel would recompute all the cells that depend on them, as soon as it can. Pathway does the same in streaming mode, as soon as you connect your inputs to a connector.

From what I see, your question covers two separate elements:

  1. How to trigger recomputation for old rows each time 1M new rows arrive;
  2. (Potentially: how to force new rows to wait (be queued) into a batch of 1M rows precisely before processing them.

For point 1, there are two separate topics, what we would call in Excel AGGREGATES (like sum over rows) and VOLATILE functions (like API calls to a financial/currency API that changes its answers each time you call). I'm guessing your question is more about Volatiles but let's cover both.

Starting with AGGREGATES, Pathway recomputes aggregates whenever fresh data arrives. Like in SQL, the mechanism is based on "groupby", followed by "reduce". So, what you would write in Excel as summing column A, SUM(A:A), is written for a Pathway table t with column colA: t.groupby().reduce(sum=pw.reducers.sum(t.colA)) or equivalently t.reduce(sum=pw.reducers.sum(t.colA)).
Other reducers include count, max, etc. For a list of available reducers, please see https://pathway.com/developers/api-docs/reducers. Two of these reducers are special in that they allow you to create your own function acting on all of the rows of the table: these are pw.reducers.tuple which returns an enormous Python tuple containing all the aggregated values from the columns (slow but a convenient escape hatch that lets you postprocess data in Python!), and pw.reducers.udf_reducer - the proper but more advanced way to do a custom reducer more efficiently (see: https://pathway.com/developers/user-guide/data-transformation/custom-reducers).

Now, moving on to VOLATILE functions. How to force the recompute of a VOLATILE function when a trigger event happens (e.g., every 1M new rows)? This one is a bit magical to handle - as it is in Excel! You will need to quantify the precise trigger that you want to activate the recompute, as a sort of global variable. You will then pass this variable as a parameter to your transformers to make sure results are recomputed each time this variable changes.

For example, consider a table t which currently has the following contents:

colA    | colB
Alice   | -1
Bob     |  1
Charlie |  2
David   |  4
Eve     |  4
Fred    |  7

And perform on it:

t_count = t.reduce(count=pw.reducers.count())

This creates a single-row table t_count with a single column count like this:

count
6

Next, divide this value by 1M rounding down:

t_count_1M = t_count.select (count_1M = t_count.count // 1_000_000)

to get:

count_1M
0

Finally, join this in with t to get an extra column in t:

t = t.join(t_count_1M).select(t.colA, t.colB, recompute_trigger = t_count_1M.count_1M)

And here is what t now looks like with its extra new column:

colA    | colB | recompute_trigger
Alice   | -1   | 0
Bob     | 1    | 0
Charlie | 2    | 0
David   | 4    | 0
Eve     | 4    | 0
Fred    | 7    | 0

The value of recompute_trigger will jump from 0 to 1 as soon as the number of rows of t exceeds 1M, then it will jump to 2 as soon as it exceeds 2M. Now, when you create your AsyncTransformer to call into your API, make sure to pass recompute_trigger as an extra parameter when doing the invoke, it in addition to the arguments it actually needs. It will be useless for the actual API call, but will trigger a refresh when it changes :-).

Then, for point 2 above: if you really need it (there are not so many use cases where you actually do need it, it's an artificial slow down), there is a way to achieve this with a queue using tumbling window mechanisms - if you do need this, I'll ask someone from the team to contribute the cleanest answer. In that case, please specify whether you need the computation to be done every 1M rows, precisely and always, or if you tolerate skipping over a batch of 1M and bundling it together with the next one if your system is not catching up (this may happen e.g. if you have 1M rows arriving every second).

Happy to clarify further if needed!

@ilyanoskov
Copy link
Author

ilyanoskov commented Mar 16, 2024

Hey @dxtrous, thank you very much for such a detailed answer! This is incredibly helpful, I have spent more than half of the week reading entire Pathway documentation and doing POC implementations to see what works best.

One thing I do not fully understand is how do we force AsyncTransformer to recompute things once 2M rows are available (and we have already processed the first 1M rows downstream)? I also need to do this for 2M, 3M, 4M rows, and so on. Do I define 10 different AsyncTransformers in this case? Is it possible to do it all with one AsyncTransformer? So there is essentially a need to accumulate the data for all years and then run the computation... I also do not see recompute_trigger flag anywhere in the docs for this?

Or is it possible to take the entire data available in t inside the AsyncTransformer when the invoke is called? I thought it only takes the latest row? Or can I reference t inside the AsyncTransformer as an external variable (I have not tried this yet)?

To add more context here, I actually need to compute things per year, which is not too different from the count of rows (I will just rewrite the logic to indicate that as soon as a new year appears in the rows - run calc). So this particular operation I am trying to achieve is more of a batch pattern, but then I will need streaming per row everywhere else in the system, so I am looking for a way to combine this.

Perhaps the second point is more applicable here? I do not think I will actually be getting 1M rows a second, or if I do, it's ok to wait for the system to catch up in this case I think.

Thank you very much in advance

@dxtrous
Copy link
Member

dxtrous commented Mar 16, 2024

Hey @ilyanoskov, thanks for the additional context!

Please disregard the discussion of "volatile" functions and triggers from my previous reply in that case.

Please note that AsyncTransformer is called row by row, for each row separately. It is very similar to apply or transform in pandas (but additionally, asynchronous). It would normally be used to call an external API, or just possibly, a heavy computation on GPU.

So, to pass multiple (say 1M) rows at one go into AsyncTransformer, you will first need to reduce these rows to one. If you have date or some other form of timestamp as a column in your data and want to group data by year, that's brilliant, it is likely to simplify the logic a lot. With this in mind, my understanding of what you need is the following (listing keywords) - hopefully it will be 1 longish line of code :-):

  • Write one line to get windowby + reduce on your data with a Tumbling window with specified window duration = 1 year (https://pathway.com/developers/user-guide/temporal-data/windows-manual/#temporal-tumbling-windowing)
  • Make sure your computations do not start before your year is "closed". This can be achieved using a window configuration (extra parameter to the windowby) called behavior. Please take a look at https://pathway.com/developers/user-guide/temporal-data/windows_with_behaviors/. From what I see, your "delay" will be same as window size i.e. 1 year (see illustration image in the linked tutorial), the "cutoff" will probably be 0 (if you are sure your data is coming all in chronological order). This is a pretty standard case and can be achieved immedatiely by setting your window behavior to pw.temporal.exactly_once_behavior() without any parameters https://pathway.com/developers/api-docs/temporal#pathway.stdlib.temporal.exactly_once_behavior - computations on year N will trigger when data from year N+1 starts :).
  • For the windowby+reduce, you can try out any reducer first (can be anything like count or max on some column) just to see if the output of the windowby+reduce is what you expect - you should be getting one row, each time you feed in a full year of data. So 10 rows out of 10 years of data.
  • Switch over to the tuple reducer and see its outputs. It should be good enough - though the 10 rows you have will contain incredibly long tuples (of about 1M elements each).
    Now, with this done, you can pass the output table you got (which will have 1...10 rows as your data flows in, each with the extremely long tuple column) through the AsyncTransformer, to train your model.

Would this make sense?

By the way, if you are looking for an introduction to Tumbling Windows as a concept, you can also take a look at this tutorial https://pathway.com/developers/showcases/suspicious_activity_tumbling_window/ to see if it resonates.

@ilyanoskov
Copy link
Author

@dxtrous thank you very much ! I will try just that and report back :)

@dxtrous
Copy link
Member

dxtrous commented Mar 16, 2024

@ilyanoskov Yes do keep us posted!
I missed one point / question - whether, say, in year 7, you would just want data from year 7 to go into the 7th model, or data from all years 1..7 to go into the 7th model. (If it's the latter, you will finally need to replace tumbling window by a sliding window of length 10 years and hop of 1 year. It's a tiny change, I'd do it at the end, once you are comfortable with tumbling windows working).

@ilyanoskov
Copy link
Author

ilyanoskov commented Mar 18, 2024

I was able to do this with the tuple reducer and the sliding window of length 10 years and hop of 1 year. Thank you!

@dxtrous do you by any chance know, how to prevent the sliding window from sliding beyond available data? I did the duration for 10 years, and for 2017 the window is looking into 2027 (and creating more rows). How do I stop at 2024?

@dxtrous
Copy link
Member

dxtrous commented Mar 26, 2024

do you by any chance know, how to prevent the sliding window from sliding beyond available data? I did the duration for 10 years, and for 2017 the window is looking into 2027 (and creating more rows). How do I stop at 2024?

This is a great question which applies to finite (bounded) data streams.
@KamilPiechowiak would it make sense for us to envisage a scenario/flag in which an incomplete window having e.g. ExactlyOnceBehavior does not have compute triggered by end-of-data (is simply left out from results if data ends before the window ends)?

@ilyanoskov an (ugly) workaround is to filter out results larger than some given year (or max - 9 years) in post-processing of the windowing results.

@KamilPiechowiak
Copy link
Contributor

@ilyanoskov an (ugly) workaround is to filter out results larger than some given year (or max - 9 years) in post-processing of the windowing results.

I'd say the workaround is not that ugly. If the windows that are filtered out are not a significant majority of all the windows, the performance overhead should be small. Syntax-wise, I think an additional filter is easier to understand than an additional parameter in windowby/window definition/window behavior.

@KamilPiechowiak would it make sense for us to envisage a scenario/flag in which an incomplete window having e.g. ExactlyOnceBehavior does not have compute triggered by end-of-data (is simply left out from results if data ends before the window ends)?

It makes things more complicated as entries need to be aware of the maximal entry in the stream. If they're not aware of it, we can trigger window production later (or never) but its state will be initialized anyway so performance-wise it'll be similar to filtering out unwanted windows.
Once we optimize the performance of windows further we can think of such scenario but in the current implementation it wouldn't make much difference.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants