Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Groupwithin improvements #3186

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

Angel-O
Copy link
Contributor

@Angel-O Angel-O commented Mar 25, 2023

Summary

Apologies in advance for the wall of text: hopefully this time it will be worth it (if not this is probably my last attempt on this for a while 😅)

This PR is a followup from #3162 (thanks all for the feedback)

Goals:

  • improving timeout behaviour (increased accuracy)
  • improving performance: over 100% faster in certain scenarios 🚀 , (with better memory utilization: this is what I'm gathering from gc benchmark stats)
  • fixing interruption propagation misbehaviour
  • simplifying logic (race condition)

Timeout behaviour

The current implementation has a small flaw in the timeout logic: when entering the timeout state the stream waits for the first element then tries to calculate how many elements are available using the supply semaphore. It finally uses that figure to decide where to split the buffer.

The problem is that even if the buffer has collected enough elements to make up a chunk, these won’t always be part of the emitted chunk, because of the above logic. I suspect this is what’s happening in the "accumulation and splitting" test, which is more flaky than it should probably be (some level of flakiness is probably acceptable since it’s fundamentally a time based test, but with the current logic it’s very easy to get a 50%, or even higher, failure rate).

When running the test below (a slightly modified version of the original test that includes an additional sleep) the situation is even worse: it’s nearly impossible to get the test passing)

test("accumulation and splitting 2".only) {
  val t = 200.millis
  val size = 5
  val sleep = Stream.sleep_[IO](2 * t)
  val longSleep = Stream.sleep_[IO](10 * t)

  def chunk(from: Int, size: Int) =
    Stream.range(from, from + size).chunkAll.unchunks

  // this test example is designed to have good coverage of
  // the chunk manipulation logic in groupWithin
  val source =
  chunk(from = 1, size = 3) ++
    sleep ++
    chunk(from = 4, size = 1) ++ longSleep ++
    chunk(from = 5, size = 11) ++
    chunk(from = 16, size = 7)

  val expected = List(
    List(1, 2, 3),
    List(4),
    List(5, 6, 7, 8, 9),
    List(10, 11, 12, 13, 14),
    List(15, 16, 17, 18, 19),
    List(20, 21, 22)
  )

  source.groupWithin(size, t).map(_.toList).assertEmits(expected)
}

This PR introduces a new mechanism to improve the accuracy of the timeout logic. Instead of calculating the number of elements to be flushed, we flush them and then lower the supply accordingly. We also use acquireN vs tryAcquireN since the number of elements flushed is known.
I'm also using a SignallingRef for state management and to provide an upper bound to the fiber that waits for the first chunk. I need this to comply to one to the test cases.

The result is a more accurate behaviour (see below)

I've run both "accumulation and splitting" tests a number of times and had zero failures so far

Screenshot 2023-03-25 at 18 38 29


Performance

benchmarks figures are better compared to the existing implementation: consistently faster, over 100% faster (ops/sec), especially on large streams, and with better gc stats (see screenshots below)

Example

  • test: stress test (short execution): all elements are processed (with range parameter set to 10 mill)
    • suggested implementation: 15 seconds
    • existing implementation: 30 seconds

suggested implementation

Screenshot 2023-03-27 at 22 35 20

existing implementation

Screenshot 2023-03-26 at 18 04 08


Simplified logic

This implementation is hopefully simpler to follow.

  • removing the custom error propagation (relies on concurrently)
  • limiting the use of Semaphore to what's needed, (demand is gone in favour of a bounded queue)
  • reducing the demand/supply Option[Either[T, A]] state flags to a single boolean
  • relying on the fs2 streams api to await a chunk instead of splitting the state Vector

Tests

This PR include new tests

  • accumulation and splitting 2 (I'm happy to remove this, or include the longSleep in the original test, it was just for illustration purposes)
  • stress test (short execution): all elements are processed. (copy of the benchmark tests with an integrity check at the end) I found this useful as it allowed me to verify the behaviour over a long stream of elements, uncovering a couple of bugs in the permits management logic. As mentioned elsewhere it is possible to write an implementation that passes all other tests consistently, but fails this one
  • stress test (long execution): all elements are processed: basically the same as the one above, but it pauses after each elements. in theory this allows to test the timeout logic more frequently than the previous test. Marked as ignored since it takes ~15 mins to complete with TestControl
  • if the buffer fills up at the same time when the timeout expires there won't be a deadlock (thanks @armanbilge for explaining the problem and for suggesting a nice way to reproduce the bug) this test allowed me to confirm the race condition bug and to make sure the fix was working as expected
  • upstream error/interruption propagation error tests. (thanks @SystemFw for explaining what the correct behaviour should be on my previous attempt)

Notes

  • may be a fix for Fix groupWithin test that intermittently fails #2432 (let me know what's your experience, I haven't seen a single failure yet, when running these tests in isolation or as part of a whole suite.
    EDIT: I've made a small change and I've seen the first failure (on CI) after several successful runs, so it is still a flaky test. The failure rate however has decreased significantly, I'm still seeing several successful test runs locally as shown in the screenshot above)
  • fixes groupWithin: inconsistent behaviour on source termination #3169 (unless Fixing interruption behaviour #3183 gets merged first, note though that the interruption behaviour is not working not only on upstream termination but also when chunkSize == 0 (I believe this is a bug and I assume the behaviour of this implementation is the intended behaviour. I should be able to change it easily if that's not the case. Either way I'm happy to add a test to document the behaviour))

Thank you

@armanbilge
Copy link
Member

avoid deconstructing tuple in for-compr to avoid CI js failure

Sorry, this is my trigger 😆

This issue is not related to Scala.js. The core Scala language (i.e. syntax) actually works 100% identically for all three platforms (if you ever find a case where it doesn't, that's a legitimate bug and we should fix it :).

Sometimes, a JS CI job will happen to fail first. That doesn't mean it's a JS-specific issue.

/rant 😇

@Angel-O
Copy link
Contributor Author

Angel-O commented Mar 26, 2023

@armanbilge I had a look, this time it sounds like a Scala.js problem on scala 3 (setting the version to 2 and running the CI command ++ 3.2.2 Test/scalaJSLinkerResult doesn't cause issues).

I tried to set the compiler flag -source:future, but that brings up a bunch of other unrelated errors/warning, that perhaps deserve a separate PR.

Also I've found a similar issue here: sounds like a complier issue from the discussion. Do you think it's worth spending more time on this ?

@armanbilge
Copy link
Member

I had a look, this time it sounds like a Scala.js problem on scala 3

You can't reproduce it with Scala JVM?

@Angel-O
Copy link
Contributor Author

Angel-O commented Mar 26, 2023

You can't reproduce it with Scala JVM?

Nope: the command is not available on jvm

@armanbilge
Copy link
Member

scalaJSLinkerResult is just running compile, try compiling on JVM :)

@Angel-O
Copy link
Contributor Author

Angel-O commented Mar 26, 2023

yeah compile works on jvm, (I think it's a subcommand of test and I've run these tests a number of times 😆)

@armanbilge
Copy link
Member

Test/compile on Scala 3 JVM works, and Test/compile on Scala 3 JS doesn't work? That would be a bug.

@Angel-O
Copy link
Contributor Author

Angel-O commented Mar 26, 2023

Test/compile on Scala 3 JVM works, and Test/compile on Scala 3 JS doesn't work? That would be a bug.

I've run the jvm tests on scala 2, let me try with scala 3

@armanbilge yeah you're right I'm getting the same error.

As mentioned earlier though using the -source:future compiler flag, doesn't work:

[warn] 60 warnings found
[error] 55 errors found

I'm leaning towards keeping it as it is, unless you know how to fix it: because I have no idea: the issue linked earlier labels it as a won't-fix. Hopefully deconstructing vs non-deconstructing the tuple is not a deal breaker. Either way should we move this to Discord ? The conversation is moving away from the PR 😢

@armanbilge
Copy link
Member

Hopefully deconstructing vs non-deconstructing the tuple is not a deal breaker.

It's not a problem at all, just a symptom of cross-compiling across Scala 2 and Scala 3. Sorry that I derailed things 😓

@Angel-O
Copy link
Contributor Author

Angel-O commented Mar 26, 2023

It's not a problem at all, just a symptom of cross-compiling across Scala 2 and Scala 3. Sorry that I derailed things 😓

No worries: it's just because I've opened a bunch of PRs doing the same thing and it might get confusing. Thanks for explaining though, learned something again 🥇

@Angel-O Angel-O marked this pull request as draft March 27, 2023 08:17
* handling edge cases
* refactoring
* adding tests
@Angel-O Angel-O marked this pull request as ready for review March 27, 2023 21:09
@Angel-O
Copy link
Contributor Author

Angel-O commented Apr 22, 2023

@armanbilge I was tweaking the benchmark parameters and I've discovered a flaw in this implementation.

Currently the buffer window is set to 100 micros meaning every test run will generate very small buffers (with 1 or 2 elements), simply because the timeout is too short for the buffer to accumulate a significant number of elements. So I've decided to extend the timeout to 1 second and I've also increased the total number of elements and the buffer size to 1 mill and 100k respectively.

This is what I've found out:

  • using a Queue from cats for back pressure and buffering adds a significant overhead in terms of memory (perhaps this was obvious, I didn't know)

And these are the results:

visual vm & benchmarks (only one test case)

(no longer relevant) suggested implementation (cats bounded Queue) - replaced

queue implementation (1mill, 100K, 1sec)

queue implementation bm (1mill, 100k, 1sec)

existing implementation - main branch

old implementation (1mill, 100k, 1sec)

old implementation bm (1mill, 100k, 1sec)

suggested implementation (Semaphore + Vector) - this branch

new implementation (1mill, 100k, 1sec)

new implementation bm (1mill, 100k, 1sec)


In light of this, I've decided to restore the Vector (Chunk doesn't seem to improve performance, at least with the existing benchmarks, will test with different benchmark params) + demand semaphore (renamed to backpressure, explained below).
This is the commit with the change explained above (there's also another small improvement where the supply == 0 is checked only once and some refactoring)


So to summarize, compared to the existing implementation I'm seeing better performance overall and increased accuracy:

  • ops/sec: 70%-90% better (see benchmarks above: nearly twice as fast)
  • memory: pretty much the same (see visual vm screenshots)
  • timeout logic (tested via "accumulation and splitting" logic): lower failure rate (from 60/80% to less than 1-2% maybe: hard to give an exact figure, but failures are few and far between. I haven't seen a single failure locally yet, but I've seen failures on ci: example)

On top of that a few improvements:

  • timeout logic is a bit clearer (at least to me)
  • splitAt logic is gone, the entire buffer is replaced instead since the buffer is bounded it will contain at most chunkSize elements
  • end of demand tracking is gone, the semaphore is purely used for backpressure (as opposed to backpressure + upstream interruption via forall)
  • error/interruption propagation relies on existing combinators (concurrently)
  • edge case (chunkSize == 0) is explicitly handled (not sure if this is the best way to handle it (i.e. should the stream terminate immediately, should we have enforce the groupSize to be positive ?). Either way changing it to whatever makes the most sense it's easy. Note that the existing implementation causes the stream to never end
  • edge cases (chunkSize == 1 and timeout == 0) uses an existing combinator (chunkN, not sure about this since it might lead to different behaviour in terms of other properties of groupWithin depending on the parameters. If that's not desirable it can be changed easily)

So if the above makes sense and you guys are happy with these changes I think that the bulk of the work is done and what's left is:

  • decide the desired behaviour around edge cases (and write tests/scaladocs at least for the chunkSize == 0 scenario)
  • (possibly) parameterise benchmarks including a longer timeout window to have more realistic scenarios
  • test Chunk vs Vector with timeout longer than 100 micros (apparently Chunk produces results similar to Queue)
  • mark the "accumulation & splitting" test(s) as flaky (even though the behaviour is more accurate, truth is it can still fail)
  • remove tests added here Fixing interruption behaviour #3183
  • sync this branch with main

Notes

  • I don't think this ci failure is caused by a deadlock or defect in this implementation. The same error (still on rootJS tests) occurred (see that ci failure logs) on the same test on Fixing interruption behaviour #3183 which uses the same implementation currently on the main branch (not to mention that one of the tests added in this PR explicitly checks that no deadlock occurs)

@armanbilge
Copy link
Member

Thanks for that update!

In light of this, I've decided to restore the Vector (Chunk doesn't seem to improve performance, at least with the existing benchmarks, will test with different benchmark params)

I have a question about this: I understand that Chunk didn't improve performance in your benchmarks, but does it degrade performance? The reason I ask is because there is a big difference between e.g. a Chunk[Byte] and a Vector[Byte]: the chunk can be backed by a primitive array but the vector will be boxed.

@Angel-O
Copy link
Contributor Author

Angel-O commented May 12, 2023

I have a question about this: I understand that Chunk didn't improve performance in your benchmarks, but does it degrade performance? The reason I ask is because there is a big difference between e.g. a Chunk[Byte] and a Vector[Byte]: the chunk can be backed by a primitive array but the vector will be boxed.

@armanbilge sorry for the late reply.

I see, thanks for explaining. When I ran the benchmark last time using s2.Chunk I had noticed a similar behaviour (memory-wise) observed with the cats Queue (visual vm stats/graphs were similar).
In terms of ops/sec instead I didn't notice any particular difference, but I was running the existing benchmarks using Int values.

I'm wondering if the difference between Chunk[Byte] vs Vector[Byte] calls for a separate method, or maybe better a separate extended api specifically targeting Streams[F, Byte]

@armanbilge
Copy link
Member

I'm wondering if the difference between Chunk[Byte] vs Vector[Byte] calls for a separate method, or maybe better a separate extended api specifically targeting Streams[F, Byte]

It doesn't just apply to Byte, it applies to all primitive types. The specific needs of FS2 are precisely why Chunk exists and is used in APIs instead of e.g. Vector. Here's a presentation by Michael discussing some of this :)
https://www.youtube.com/watch?v=wOybldcyMLs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

groupWithin: inconsistent behaviour on source termination
2 participants