Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZStream#buffer doesn't preserve order of elements #8699

Open
ghostdogpr opened this issue Apr 2, 2024 · 6 comments
Open

ZStream#buffer doesn't preserve order of elements #8699

ghostdogpr opened this issue Apr 2, 2024 · 6 comments

Comments

@ghostdogpr
Copy link
Member

Reproducer:

import zio.*
import zio.stream.*

object Test extends ZIOAppDefault {
  val expected = Chunk.fromIterable(0 until 100)
  val s        = ZStream.fromChunk(expected).buffer(16)

  def run = s.runCollect.map(_ == expected).debug.repeatWhile(identity)
}

It returns false after just a few iterations. If I remove .buffer(16), it seems to work fine.

@ghostdogpr
Copy link
Member Author

ghostdogpr commented Apr 2, 2024

The issue is actually in Queue (buffer calls runIntoQueueElementsScoped).

import zio.*

object Test extends ZIOAppDefault {
  val expected = Chunk.fromIterable(0 until 100)

  def run =
    (for {
      queue         <- Queue.bounded[Int](16)
      _             <- queue.offerAll(expected).fork
      actual        <- queue.take.replicateZIO(100).map(Chunk.fromIterable)
      orderPreserved = actual == expected
      _             <- ZIO.when(!orderPreserved)(ZIO.fail(s"Order not preserved: $actual"))
    } yield ()).forever
}

@ghostdogpr
Copy link
Member Author

Narrowed it down to this line:

unsafeOfferAll(putters, putter +: unsafePollAll(putters))

When this is executed concurrently, you can end up inserting the element out of order.

@ghostdogpr
Copy link
Member Author

Actually me from 2018 knew that the queue implementation was not 100% fair 😆 #447 (comment)

When it comes to streams though, I think we expect the order to be maintained?

@eyalfa
Copy link
Contributor

eyalfa commented Apr 21, 2024

@ghostdogpr I'm currently attempting an optimization on zio.stream.internal.ChannelExecutor#readUpstream and experience one of the streams tests becoming flakey:

- ZStreamSpec / Combinators / tapSink / does not read ahead - 18 ms
        ✗ 1 was not equal to 6
        result == 6
        result = 1
        at /Users/efarago/dev/github/zio/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala:3727

looking at ZStream.tapSink it seem like it (rightfully) relies on a bounded queue of size 1 to retain order when facing a single producer (very similar to buffer), I suspect something happened causing the sink fiber to see the end marker message before some of the emits. makes sense?
I'm not 100% 'into' the queues implementation but I'd expect the single writer scenario to be valid.

  • I added @@ repeats(1000) to the test once I understood it's flakey
  • will try this on the 2.x branch as well just to make sure /i didn't introduce a regression in my WIP optimization (hope to share soon)

@jdegoes
Copy link
Member

jdegoes commented May 8, 2024

/bounty $150

Copy link

algora-pbc bot commented May 8, 2024

💎 $150 bounty • ZIO

Steps to solve:

  1. Start working: Comment /attempt #8699 with your implementation plan
  2. Submit work: Create a pull request including /claim #8699 in the PR body to claim the bounty
  3. Receive payment: 100% of the bounty is received 2-5 days post-reward. Make sure you are eligible for payouts

Thank you for contributing to zio/zio!

Add a bountyShare on socials

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants