New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
optimize ZStream's mapZIOPar and mapZIOParUnordered #8819
Conversation
…ered, stabilize mapZIOParUnordered2
… executor so we can guarantee child fibers are not interrupted too soon.
…iber-accessed only, this allows lifting the synchronized layer on top of it
…withPermits(n) and a simple Left(()) message to signal EOF
…hen A2 is an Either.
…s (drop the Take wrappers).
…ation, this falls 8-9% short of the direct stream.mapZIOPar impl.
… mapZIOParUnordered with bufferSize parameter
@jdegoes @kyri-petrou @ghostdogpr I'd appreciate your reviews |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ZStream changes are a bit beyond my understanding so I haven't reviewed them; I might give it a go later. For now just reviewed the FiberRuntime changes which I understand a bit better
val body = () => { | ||
val next = iterator.next() | ||
var curr: Fiber.Runtime[_, _] = null | ||
|
||
if (next != null) next.await(id.location) else Exit.unit | ||
def skip() = { | ||
curr = null | ||
while (iterator.hasNext && (curr eq null)) { | ||
curr = iterator.next() | ||
if ((curr ne null) && !curr.isAlive()) | ||
curr = null | ||
} | ||
} | ||
|
||
// Now await all children to finish: | ||
ZIO | ||
.whileLoop(iterator.hasNext)(body())(_ => ())(id.location) | ||
skip() | ||
|
||
if (null ne curr) { | ||
val body = () => { | ||
val c = curr | ||
skip() | ||
c.await(id.location) | ||
} | ||
|
||
// Now await all children to finish: | ||
ZIO | ||
.whileLoop(null ne curr)(body())(_ => ())(id.location) | ||
} else null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was struggling to follow the logic here. I think the previous code could have been minorly adjusted to make it more performant as:
val body = () => {
var next = null
while (next eq null && iterator.hasNext) {
val next0 = iterator.next()
if (next0 ne null && next0.isAlive()) next = next0
}
if (next ne null) next.await(id.location) else Exit.unit
}
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might work, I'll have a deeper look at it.
the basic idea is to efficiently implement a kind of filtering iterator here without actually paying for one (scala's filtering iterator is effectively a buffered iterator which introduces some costs to its hasNext
implementation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kyri-petrou I was actually able to drop body
altogether and reduce the whileLoop
to this:
ZIO
.whileLoop(null ne curr)(curr.await(id.location))(_ => skip())(id.location)
in fact, there's another possible additional optimization here:
by the time we return the whileLoop
effect we already have the first child fiber, so we can separate the first iteration from the rest by doing something like:
ZIO.suspend(val effect = curr.await; skip(); effect) *> whileLoop(...)
it'd also be nice if there was a doWhile
effect...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a Java iterator though (_children
is a Java Set).
In either way, do we know how many of the children here are expected to be still alive? What use-case do you have where the children are alive by the time the fiber exits? Does this relate to anything in this PR, and if yes, are we better off waiting for it to be merged first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ZIO .whileLoop(null ne curr)(curr.await(id.location))(_ => skip())(id.location)
Ah nice, I like this. Just maybe write it as curr ne null
cause it's a bit uncommon the other way around 😅
in fact, there's another possible additional optimization here: by the time we return the
whileLoop
effect we already have the first child fiber, so we can separate the first iteration from the rest by doing something like:ZIO.suspend(val effect = curr.await; skip(); effect) *> whileLoop(...)
I think that's a very micro micro-optimization at the cost of readability. I don't think we're saving anything more here other than 1 curr ne null
check as far as I can tell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kyri-petrou I don't think this is related to #8816 , furthermore when a fiber terminates all of its child fibers (and grand children and all descendants in general) are interrupted by definition (structured concurrency and such...) so I'm not 100% why ZIO.foreachPar
has to interfere with this behavior in any way... but again irrelevant to this PR.
regarding the two operations optimized by this PR, in the happy case I expect exactly zero running child fibers when the stream completes, hence there should be no fibers to interrupt when the parent fiber completes, this is of course not 100% true... streams may fail, for the mapZIOParXXX operators this means some operations may still be running and have to be interrupted. there are also cases where the stream itself doesn't fail but downstream cancels by applying an operator like takeN
.
so summing this up, in the context of these two stream operators and probably for most correct fibers aware code I'd expect no pending fibers or really just a few on successful completion, in case of failure/interruption there might be more and some complicated code may actually spawn children expecting them to be terminated on completion.
so the motivation behind this micro(scopic) optimization was the expectation of very few pending child fibers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kyri-petrou once a career starts with ~15 years of writing C++, you put the nulls first on equality checks 😎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding #8816, I agree with you, ZIO.foreachPar
shouldn't be interfering. But there is a use-case (imho not a valid one) that would be affected if we changed that behaviour). Check the PR description for more info on that
.pipeTo(self) | ||
.pipeTo(enqueueCh) | ||
.runScoped | ||
.forkScoped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to have runScoped
with forkScoped
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the channel is running in its own fiber which is kept alive with some management code that somehow binds the fiber's lifetime to the scope's lifetime. this is important since the optimized code relies on its spawned fibers to be bound to the channel's lifetime (specifically the enqueuer channel+fiber)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see this, the channel is executed within its own fiber and this fiber is kept alive until the scope is closed.
all fibers spawned by effects executed by the stream are descendants of this fiber (unless explicitly forked as daemons or explicitly transplant
ed)
@@ -2131,4 +2263,11 @@ object ZChannel { | |||
): ZChannel[Env1, InErr, InElem, InDone, OutErr, OutElem, OutDone] = | |||
self.provideSomeEnvironment(_.updateAt(key)(f)) | |||
} | |||
|
|||
private case class QRes[A](value: A) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be an AnyVal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QRes
is used to distinguish between user specified type (i.e. the Channel's OutElem) we know anything about, and 'control' types like Cause
.
I guess it could be dropped if the code was migrated to scala3, using union types, the basic idea is to avoid wrapping the stream elements themselves (as they make the vast majority of messages running through the queue) while wrapping the completion/failure message. this way the reader can do a very simple pattern matching to identify the last (success/fail) message and quickly handle the data elements themselves
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @paulpdaniels was referring to having QRes extend AnyVal
to avoid allocation from wrapping, like:
private case class QRes[A](value: A) extends AnyVal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be possible, checking
ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k => | ||
ZIO.succeed { | ||
self.tell { | ||
FiberMessage.Stateful { case (fib, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this optimization will cause .children
to block if the underlying fiber is blocked. This means that operations like fiber dump will not work properly on Loom.
I would like to eliminate the synchronization on the children set, too, but I'm afraid it is probably going to require a new lock-free set implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u please elaborate on this? under the current implementation the fiber is 'waking up' on queued messages, what's going to happen in loom? r u concerned about scenarios where the fiber is blocking on something like promises? does this mean FiberMessage.Stateful
is going away? will FiberRuntime even have a queue like it does today?
furthermore, current implementation relies on FiberMessage.Stateful
for common operations like tellAddChild
(used by zio.internal.FiberScope.Local#add
), so I don't think this specific change introduces any new blocking risk.
seems like going 'loom' will require substantial changes to zio fibers...
// Now await all children to finish: | ||
ZIO | ||
.whileLoop(iterator.hasNext)(body())(_ => ())(id.location) | ||
//find the first operable child fiber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@jdegoes, will u approve this PR once the children set related changes are reverted (or agreed upon)? |
…other fibers, but use the observaton it is only mutated by the fiber itself in order to grnaularily control the required synchronization
@jdegoes I think I found a middle ground between making the children set 'fiber private' and your concerns about blocking in a futuristic loom based runtime.
since the @kyri-petrou I've also applied an optimization on the
|
…otect from race with transferChildren
@jdegoes @kyri-petrou , I think this is now ready for another round of review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is much more readable 😍 I just thought about a few things, questions more or less. Let me know your thoughts.
queueReader = ZChannel.fromInput(input) | ||
queue <- Queue.bounded[Fiber[Option[OutErr1], OutElem2]](bufferSize) | ||
permits <- zio.Semaphore.make(n) | ||
failureSignal <- Promise.make[Option[OutErr1], Nothing] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't we currently losing errors with this? if multiple fibers have failed we'd ideally see that in the Cause
. But this signal seems only necessary to interrupt running fibers, which would yield a Cause.Interrupt
for each of them. So I wonder if you could just turn it into Promise[Nothing, Unit]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what u mean by 'loosing errors', AFAIK this preserves the same behavior as the original implementation, hence in case of failures the stream fails with the first noticeable failure (in case of race, an arbitrary 'winner'). in fact the original impl also used the errorSignal promise both for interrupting 'surviving' fibers and for signaling downstream of the error (the effect in the queue reflects that).
re. changing the failure signal to Promise[Nothing, Unit]
, this will fail some of the existing unit-tests. basically in case of parallelism with N>1, a case when the first effects is blocking/still running but another effect fails should result with the stream failing with the 'later' failure (with all pending fibers effectively interrupted). this means either the reader or the writers must race with the failure signal, doing this on the reader side is a bit simpler but resulted with a performance penalty (roughly speaking you need to race this per record, better do this in parallel. backed by benchmark numbers).
re. your concerns about the fibers failing with interrupt instead of the actual failure: two cases when fibers are interrupted:
- the entire stream is interrupted, I think we can't expect another failure in this scenario
- when another fiber fails and manages to complete the failure signal promise, since the executing fibers are all raced with the failure signal, relying on
raceWith
behavior, I explicitly interrupt the looser and propagate the error from the winning side, so I can't c an interruption winning over an actual failure in this scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes i can see errors can be lost in the current implementation too, so it's fine if you don't try to address that. you can see how multiple failing fibers are handled with ZIO.forEachPar
here: https://scastie.scala-lang.org/HtsjbCtTSoCoT5UBuKw28Q and if you want to learn more about how this works: https://zio.dev/reference/core/cause#cause-internals
Update: I should think twice before saying something that stupid, of course you can't block emitting elements before all fibers are done 🤦♂️
i wonder if the performance hit could be overcome by changing the one element at a time nature of your solution. even with your current implementation you could do:
queue.takeUpTo(bufferSize).flatMap(Fiber.collectAll(_)).map(ZChannel.writeChunk(_))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the SingleProducerAsyncInput
constructs seems to have been designed for ZChannel.mergeAllWith
, but here there is only one consumer. It wouldn't be too hard to create a SingleConsumerAsyncInput
that internalises the queue, that would drastically simplify your implementation. One element at a time processing instead of by chunks is the top reason for poor performance with streams (here is a nice example: https://blog.pierre-ricadat.com/supporting-high-performance-response-streaming-in-shardcake#heading-chunking-express)
restore => | ||
for { | ||
localScope <- zio.Scope.make | ||
_ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate we don't seem to have access to Semaphore.reserve
and Semaphore.release
anymore as with them you could ditch that local scope:
restore(permits.reserve(1) *> f(a)...).onExit(_ => permits.release(1)).fork
perhaps we could resurrect them with the unsafe pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also had this thought, we can introduce them as private[zio]
or as unsafe operations
localScope <- zio.Scope.make | ||
_ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope))) | ||
fib <- restore { | ||
f(a).catchAllCause { c => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may have multiple fibers failing at the same time, so I wonder if we could handle that, but it won't work if you use the promise to carry the error. Ideally in the resulting cause we'd see all the failures and all the interruptions. Maybe you could do this instead?
f(a)
.onError(_ => failureSignal.succeed(()))
.onExit(localScope.close(_))
.fork
and then you only need one race in the consumer:
val left = queue.take.flatMap(_.join)
val right = failureSignal.await
// grab all the fibers in the queue
.zipRight(queue.takeAll)
// interrupt queue producers and consumers if any,
// actually maybe a bit more complex if `n > bufferSize`
.zipLeft(queue.shutdown)
// interrupt all running fibers if any, then collect single cause
.flatMap(fs => Fiber.interruptAll(fs)
.zipRight(ZIO.foreach(fs)(_.await))
// grab all failures
.map(_.collect { case Exit.Failure(cause) => cause }.reduce(_ && _))
.map(ZChannel.failCause(_))
left.raceWith(right)(
// error will be visible in right handling so it can be ignored here
{ case (exit, fiber) =>
exit.fold(
_ => ZChannel.unwrap(fiber.await),
x => ZChannel.unwrap(right.interrupt.as(ZChannel.write(x)))) },
{ case (exit, fiber) => ZChannel.unwrap(fiber.interrupt *> exit) }
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've answered this on the previous comment:
- preserves existing behavior
- racing on the consumer was quite costly performance wise
- the complexity of this block of code... not to mention the risks it introduces, how many message are you reading from the queue after seeing a failure? how do u make sure all fibers terminate? how do u interrupt the rest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't rule out based on apparent complexity, in ZIO we have never shied away from writing complex, accurate logic without compromise. The current implementation is 50 lines of very readable code, yours is almost double that amount with more features involved; this is not necessarily a bad thing. I take your point though but i would question the introduction of bufferSize
, there is already a backpressure mechanism bounded by n
. If you remove it, then there is a guarantee that all fibers are in the queue when takeAll
happens.
|
||
val reader: ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = { | ||
lazy val reader0: ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = ZChannel | ||
.fromZIO(q.take) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could use takeUpTo(n)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought about it, I think it complicates the code a bit...
but I decided not to do so since it violates the buffer size constraint and may effectively lead to double the amount of buffered elements.
Keep up the good work 💪 these were just my 2 cents, I haven't reviewed in a long time. |
if (_children eq null) { | ||
_children = Platform.newConcurrentWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) | ||
_children = Platform.newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a big optimization potential here. Currently we're initializing the Set using the default underlying size (size 16, can accommodate 12 entries before resizing).
Since now we can add children in bulk, we therefore know how many entries the set is meant to hold (at least initially). So we can instantiate it with a size large enough to accommodate all the entries without the need to dynamically resize.
Note that to calculate the size based on the number of expected entries you need to use this equation: size = Math.ceil(nEntries / 0.75d)
I would also advice to use 16 as the minimum size as we don't get much benefit sizing it smaller, but can be really bad if we need to keep resizing later (since resizing happens in Pow2 increments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought about this potential, I think it goes beyond that since some of the spawned fibers ae very short lived and may already e finished by the time they're added to the fiber scope.
another thought I had, when forking multiple 'very similar' fibers (foreachParXXX), is it possible to come up with some kind of 'template' for the inherited fiber refs? they are all forked from the same parent so all should have very similar fiber refs except from the few which have a specialized 'fork' logic which perhaps can be pre-identified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, is it mandatory for _children
to be a set? the remove
operation is currently not used, we also never add the same fiber twice so the collection is effectively guaranteed to be a set.
I suspect an array based impl with 'occasional' GC can be sufficient here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kyri-petrou I suspect I may have to restore the synchronized weak set, seems like the set's isEmpty
method effectively modifies it.
if there was a way to get the 'naive' set's size we'd still be ok, but given the current situation I think we must restore the synchronized weak set and seek alternative (I still think this doesn't have to be a set at all).
another alternative is to restore the message based children
impl and address @jdegoes comment once ZIO actually goes loom.
@eyalfa I would approve without the fiber children set changes. Under Loom, any ordinary Such "blocked fibers" which cannot respond to any messages (because the fiber run loop is active) mean that any attempt to generate a full stack trace would block for an indefinite amount of time. That is definitely not acceptable. I think we need a true concurrent weak set, and that's a separate ticket that I will create. Until then, I don't see an alternative to always synchronizing on the children set. |
thanks @jdegoes , I'll revert the set type but leave the 'addChildren |
@eyalfa I already implemented interrupt mapping in a different (discarded) PR, so I know how to do that and I will bring it back in a month or so. |
@jdegoes I've restored the synchronized child fibers set, I'd appreciate another review. I think the fact the non-synchronized variant doesn't work for us is a bit coincidental, if there was a way to make its read-only method actually not mutating it'd make the granular locking strategy feasible... I think it should be possible to derive a very limited interface (not the fully blown collection's interface) and implement that specifically for this use case. |
@jdegoes I think it's ready for your final review |
@eyalfa Thank you for your work here! |
initial benchmark results:
optimized benchmark results:
applied various optimizations:
mapZIOParUnordered
fromZChannel.mergeAllWith
, the later seems to be far more complex.FiberScope
rather thanforkScoped
, the later seems to have some overhead related to adding and removing the finalizer from the scope, this gets amplified when executed per message in the stream.zio.Exit
when possible)FiberRuntime
's children set, make it 'fiber local' and eliminate the synchronization layermapZIOParUnordered
to make it tested as much asmapZIOPar