Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Simple processing of 1M rows takes more than 10 minutes #21

Open
ilyanoskov opened this issue Mar 18, 2024 · 12 comments
Open

[Bug]: Simple processing of 1M rows takes more than 10 minutes #21

ilyanoskov opened this issue Mar 18, 2024 · 12 comments
Assignees
Labels
bug Something isn't working

Comments

@ilyanoskov
Copy link

ilyanoskov commented Mar 18, 2024

Steps to reproduce

I have a simple Pathway function that takes more than 30 seconds to run. I am curious if this is expected or there is something wrong? How long does this take for you?

The code is:

import pathway as pw

class MySchema(pw.Schema):
    datetime: str
    flag1: bool
    val1: float
    val2: str
    val3: int
    
def run():
    data = pw.io.csv.read('data.csv', schema=MySchema, mode='static')
    clean_data = data.select(val1=pw.this.val1, val2=pw.this.val2, datetime=pw.this.datetime.dt.strptime(fmt='%Y-%m-%dT%H:%M:%S.%f'))
    pw.debug.compute_and_print(clean_data, n_rows=5)
  
run()

Just create a CSV file with 1M rows to test this.

Now, I am trying to understand why is this taking so long. What is the best way to profile Pathway performance? Also, what is the best way to load the data with the DateTimeNaive datatype from CSV? The logs from previous runs are telling me parsing DateTimeNaive from an external datasource is not supported.

Relevant log output

There are no errors, just a Pathway run to completion

What did you expect to happen?

I expected that this operation would take a few hundred ms at best? Or maybe a second.

Version

0.8.3

Docker Versions (if used)

No response

OS

Linux

On which CPU architecture did you run Pathway?

x86-64

@ilyanoskov ilyanoskov added the bug Something isn't working label Mar 18, 2024
@ilyanoskov
Copy link
Author

Additionally, I was not really able to run Pathway with multiple processes on WSL Linux (there was some kind of TCP issue?). What addresses do the processes use for inter-process communication? Is it some kind of internal server?

@embe-pw
Copy link
Member

embe-pw commented Mar 18, 2024

Steps to reproduce

I have a simple Pathway function that takes more than 30 seconds to run. I am curious if this is expected or there is something wrong? How long does this take for you?

The code is:

import pathway as pw

class MySchema(pw.Schema):
    datetime: str
    flag1: bool
    val1: float
    val2: str
    val3: int
    
def run():
    data = pw.io.csv.read('data.csv', schema=MySchema, mode='static')
    clean_data = data.select(val1=pw.this.val1, val2=pw.this.val2, datetime=pw.this.datetime.dt.strptime(fmt='%Y-%m-%dT%H:%M:%S.%f'))
    pw.debug.compute_and_print(clean_data, n_rows=5)
  
run()

Just create a CSV file with 1M rows to test this.

Now, I am trying to understand why is this taking so long. What is the best way to profile Pathway performance?

The culprit here is the pw.debug.compute_and_print function – it is not really meant for handling non-trivial amounts of data.
You can try replacing it with something like

pw.io.csv.write(clean_data, 'clean_data.csv')
pw.run()

it should be significantly faster and additionally it will provide some monitoring output.

Also, what is the best way to load the data with the DateTimeNaive datatype from CSV? The logs from previous runs are telling me parsing DateTimeNaive from an external datasource is not supported.

What you have done is probably the best way currently.
I guess you tried to provide DateTimeNaive as the type directly in the schema? This is something that it would be nice to support, but it is a bit tricky – for example, you need some way to pass the expected format, as there are many possibilities.

@ilyanoskov
Copy link
Author

Hi @embe-pw, thanks a lot for taking a look! This helped, now the ingestion is fast.

However, I am seeing that subsequent operations (sliding windows, group by, join) on 1M rows of data takes a long time. At least 5 minutes. Is this expected?

Are there any benchmarks for Pathway to compare the performance on my machine? Or what is the best way to profile Pathway?

@ilyanoskov
Copy link
Author

ilyanoskov commented Mar 19, 2024

So I have this little Pathway snippet that takes a long time to execute (10 minutes+). Can you please advise, what can be the reason why?

The data is a simple CSV with 2 columns: datetime, val1(float). I read the file with a csv reader and then write it with a csv writer (static or streaming mode)

import datetime as dt
import pathway as pw
import math


def myfunc(val1: float, val2: float) -> float:
    return math.log(val1) / val2


def calculate(data_stream):
    data1 = data_stream
    data2 = data1.copy()
    calc = (data1.interval_join(data2, 
                                data1.datetime, 
                                data2.datetime, 
                                pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1)))
                 .select(datetime=data1.datetime, 
                         val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1)))
    
    agg = (calc.windowby(pw.this.datetime,
                         window=pw.temporal.sliding(hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50)),
                         behavior=pw.temporal.exactly_once_behavior())
                .reduce(datetime=pw.this._pw_window_end,
                        avg=pw.reducers.avg(pw.this.val3)))
                                  
    return agg

It's very hard to debug what could be the reason behind this slowdown. Is it some kind of (silent) engine error on my machine? Or maybe the sliding window is silently failing / taking too long? How can I profile the performance of this query to find the slowest parts?

My version is 0.8.3

@ilyanoskov ilyanoskov changed the title [Bug]: Simple processing of 1M rows takes ~36s [Bug]: Simple processing of 1M rows takes more than 10 minutes Mar 19, 2024
@KamilPiechowiak
Copy link
Contributor

Hey @ilyanoskov,
thank you for reporting that. We are investigating the issue and will get back with the fix soon.

@KamilPiechowiak
Copy link
Contributor

Hey @ilyanoskov, I can confirm that the code can be slow and there's nothing incorrect on your side. The optimization of sliding windows is planned for the future.
For now, you can use a slightly modified version of the code that performs the aggregation twice - once aggregates data over windows of length of 1 minute and then aggregates the windows to new windows of length of 50 minutes.

def calculate(data_stream):
    data1 = data_stream
    data2 = data1.copy()
    calc = (data1.interval_join(data2, 
                                data1.datetime, 
                                data2.datetime, 
                                pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1)))
                 .select(datetime=data1.datetime, 
                         val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1)))
    
    agg = (
        calc.windowby(
            pw.this.datetime,
            window=pw.temporal.tumbling(duration=dt.timedelta(minutes=1)),
            behavior=pw.temporal.exactly_once_behavior(),
        )
        .reduce(
            datetime=pw.this._pw_window_start,
            sum=pw.reducers.sum(pw.this.val3),
            cnt=pw.reducers.count(),
        )
        .windowby(
            pw.this.datetime,
            window=pw.temporal.sliding(
                hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50)
            ),
        )
        .reduce(
            datetime=pw.this._pw_window_end,
            avg=pw.reducers.sum(pw.this.sum) / pw.reducers.sum(pw.this.cnt),
        )
    )
                                  
    return agg

@ilyanoskov
Copy link
Author

Hey @ilyanoskov, I can confirm that the code can be slow and there's nothing incorrect on your side. The optimization of sliding windows is planned for the future.

For now, you can use a slightly modified version of the code that performs the aggregation twice - once aggregates data over windows of length of 1 minute and then aggregates the windows to new windows of length of 50 minutes.

def calculate(data_stream):

    data1 = data_stream

    data2 = data1.copy()

    calc = (data1.interval_join(data2, 

                                data1.datetime, 

                                data2.datetime, 

                                pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1)))

                 .select(datetime=data1.datetime, 

                         val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1)))

    

    agg = (

        calc.windowby(

            pw.this.datetime,

            window=pw.temporal.tumbling(duration=dt.timedelta(minutes=1)),

            behavior=pw.temporal.exactly_once_behavior(),

        )

        .reduce(

            datetime=pw.this._pw_window_start,

            sum=pw.reducers.sum(pw.this.val3),

            cnt=pw.reducers.count(),

        )

        .windowby(

            pw.this.datetime,

            window=pw.temporal.sliding(

                hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50)

            ),

        )

        .reduce(

            datetime=pw.this._pw_window_end,

            avg=pw.reducers.sum(pw.this.sum) / pw.reducers.sum(pw.this.cnt),

        )

    )

                                  

    return agg

Thank you! How long does it run now?

@KamilPiechowiak
Copy link
Contributor

I don't have access to your data and the execution time depends on the data distribution (it influences the number of rows that is produced in interval_join). On 1M rows dataset generated by me (with consecutive records separated by 1 second) it takes about 1 minute.
With the fix that will be shipped in the next pathway version, the time is reduced to 25s.

@dxtrous
Copy link
Member

dxtrous commented Mar 21, 2024

@KamilPiechowiak Thanks for this fix.
I'd say if we are down to ~25s single-threaded (and then ideally to under 5s on 8 worker threads), this can be considered reasonable timing for a snippet of this weight (50x input amplification x 1M input rows = 50M row updates, corresponding to a throughput of 1M-2M data row updates per second, per worker). However, the fix still relies heavily on the pre-reduction phase to eliminate amplification.

@ilyanoskov Thanks for reporting this. Happy to know if the improved version addresses your efficiency concern sufficiently.

@ilyanoskov
Copy link
Author

ilyanoskov commented Mar 24, 2024

@KamilPiechowiak @KamilPiechowiak thanks a lot for taking a look. I am afraid 25 seconds is still quite a long time for this, Clickhouse takes ~15seconds to calculate similar features (but 2x more and over more windows), also Polars is at around the same performance as Clickhouse (in my tests). I also understand that these are not really streaming frameworks and it's not a fair comparison, but here I am looking for the fastest way to calculate the features and then keep them up to date.

I was doing a POC for my use case with this small example, but in reality I will be calculating hundreds of features across many windows, and a much larger dataset (~1B records). So it looks like Pathway is perhaps not the best tool here. Unless there is a good way to pre-compute the large dataset of features (with Clickhouse or Polars) and then update it with Pathway incrementally in a streaming fashion? But then that makes one need to write two versions of the code, one for batch and one for streaming...

P.S. I think it would be really good to introduce some kind of benchmark for window aggregation with Pathway, just to be able to track it's performance

@dxtrous
Copy link
Member

dxtrous commented Mar 25, 2024

Unless there is a good way to pre-compute the large dataset of features (with Clickhouse or Polars) and then update it with Pathway incrementally in a streaming fashion? But then that makes one need to write two versions of the code, one for batch and one for streaming...

@ilyanoskov It's an excellent point, in a mixed setup like this, usually one of the two costs/efforts (1. batch 2. streaming/incremental) dominates the other. Most use cases focus on optimizing the streaming part only as it will dominate overall cost/effort by orders of magnitude, and it doesn't make sense to overfocus on the batch part as it will remain relatively small, even if it's a factor of say 5x away from optimal - BUT your use case may be different.

P.S. I think it would be really good to introduce some kind of benchmark for window aggregation with Pathway, just to be able to track it's performance

Thanks for the idea, @KamilPiechowiak I'm all for adding this one to benchmarking in CI/CD tests.

@KamilPiechowiak
Copy link
Contributor

I am afraid 25 seconds is still quite a long time for this.

@ilyanoskov, did you take the number from my comment or did you measure it? I measured it on the data I generated on my own, so the distribution might be different than yours. For example in my dataset each input entry to interval join results in 121 output entries. If your dataset is less dense the number will be smaller and the computation faster.

Thanks for the idea, @KamilPiechowiak I'm all for adding this one to benchmarking in CI/CD tests.

Yes, we can add it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants