Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize ZStream's mapZIOPar and mapZIOParUnordered #8819

Merged
merged 36 commits into from May 15, 2024

Conversation

eyalfa
Copy link
Contributor

@eyalfa eyalfa commented May 6, 2024

initial benchmark results:

Benchmark                              (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt  Score   Error  Units
StreamParBenchmark.zioMapPar                  10000         5000              50  thrpt   15  0.194 ± 0.031  ops/s
StreamParBenchmark.zioMapParUnordered         10000         5000              50  thrpt   15  0.202 ± 0.002  ops/s

optimized benchmark results:

Benchmark                              (chunkCount)  (chunkSize)  (parChunkSize)   Mode  Cnt  Score   Error  Units
StreamParBenchmark.zioMapPar                  10000         5000              50  thrpt   15  0.458 ± 0.008  ops/s
StreamParBenchmark.zioMapParUnordered         10000         5000              50  thrpt   15  0.763 ± 0.009  ops/s

applied various optimizations:

  1. separate mapZIOParUnordered from ZChannel.mergeAllWith, the later seems to be far more complex.
  2. rely on FiberScope rather than forkScoped, the later seems to have some overhead related to adding and removing the finalizer from the scope, this gets amplified when executed per message in the stream.
  3. carefully choose data representation for data in the queue, happy-case data (stream elements) are much more common than failures or completion, make sure those are as 'naked' and accessible as possible (i.e. avoid zio.Exit when possible)
  4. slightly refactor FiberRuntime's children set, make it 'fiber local' and eliminate the synchronization layer
  5. add tests for mapZIOParUnordered to make it tested as much as mapZIOPar

… executor so we can guarantee child fibers are not interrupted too soon.
…iber-accessed only, this allows lifting the synchronized layer on top of it
…withPermits(n) and a simple Left(()) message to signal EOF
…ation, this falls 8-9% short of the direct stream.mapZIOPar impl.
… mapZIOParUnordered with bufferSize parameter
@eyalfa
Copy link
Contributor Author

eyalfa commented May 6, 2024

@jdegoes @kyri-petrou @ghostdogpr I'd appreciate your reviews

Copy link
Contributor

@kyri-petrou kyri-petrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ZStream changes are a bit beyond my understanding so I haven't reviewed them; I might give it a go later. For now just reviewed the FiberRuntime changes which I understand a bit better

core/shared/src/main/scala/zio/internal/FiberRuntime.scala Outdated Show resolved Hide resolved
core/shared/src/main/scala/zio/internal/FiberRuntime.scala Outdated Show resolved Hide resolved
Comment on lines 660 to 698
val body = () => {
val next = iterator.next()
var curr: Fiber.Runtime[_, _] = null

if (next != null) next.await(id.location) else Exit.unit
def skip() = {
curr = null
while (iterator.hasNext && (curr eq null)) {
curr = iterator.next()
if ((curr ne null) && !curr.isAlive())
curr = null
}
}

// Now await all children to finish:
ZIO
.whileLoop(iterator.hasNext)(body())(_ => ())(id.location)
skip()

if (null ne curr) {
val body = () => {
val c = curr
skip()
c.await(id.location)
}

// Now await all children to finish:
ZIO
.whileLoop(null ne curr)(body())(_ => ())(id.location)
} else null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was struggling to follow the logic here. I think the previous code could have been minorly adjusted to make it more performant as:

      val body = () => {
        var next = null
        while (next eq null && iterator.hasNext) {
          val next0 = iterator.next()
          if (next0 ne null && next0.isAlive()) next = next0
        }

        if (next ne null) next.await(id.location) else Exit.unit
      }

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might work, I'll have a deeper look at it.
the basic idea is to efficiently implement a kind of filtering iterator here without actually paying for one (scala's filtering iterator is effectively a buffered iterator which introduces some costs to its hasNext implementation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyri-petrou I was actually able to drop body altogether and reduce the whileLoop to this:

ZIO
          .whileLoop(null ne curr)(curr.await(id.location))(_ => skip())(id.location)

in fact, there's another possible additional optimization here:
by the time we return the whileLoop effect we already have the first child fiber, so we can separate the first iteration from the rest by doing something like:
ZIO.suspend(val effect = curr.await; skip(); effect) *> whileLoop(...)

it'd also be nice if there was a doWhile effect...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Java iterator though (_children is a Java Set).

In either way, do we know how many of the children here are expected to be still alive? What use-case do you have where the children are alive by the time the fiber exits? Does this relate to anything in this PR, and if yes, are we better off waiting for it to be merged first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZIO
          .whileLoop(null ne curr)(curr.await(id.location))(_ => skip())(id.location)

Ah nice, I like this. Just maybe write it as curr ne null cause it's a bit uncommon the other way around 😅

in fact, there's another possible additional optimization here: by the time we return the whileLoop effect we already have the first child fiber, so we can separate the first iteration from the rest by doing something like: ZIO.suspend(val effect = curr.await; skip(); effect) *> whileLoop(...)

I think that's a very micro micro-optimization at the cost of readability. I don't think we're saving anything more here other than 1 curr ne null check as far as I can tell

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyri-petrou I don't think this is related to #8816 , furthermore when a fiber terminates all of its child fibers (and grand children and all descendants in general) are interrupted by definition (structured concurrency and such...) so I'm not 100% why ZIO.foreachPar has to interfere with this behavior in any way... but again irrelevant to this PR.

regarding the two operations optimized by this PR, in the happy case I expect exactly zero running child fibers when the stream completes, hence there should be no fibers to interrupt when the parent fiber completes, this is of course not 100% true... streams may fail, for the mapZIOParXXX operators this means some operations may still be running and have to be interrupted. there are also cases where the stream itself doesn't fail but downstream cancels by applying an operator like takeN.

so summing this up, in the context of these two stream operators and probably for most correct fibers aware code I'd expect no pending fibers or really just a few on successful completion, in case of failure/interruption there might be more and some complicated code may actually spawn children expecting them to be terminated on completion.

so the motivation behind this micro(scopic) optimization was the expectation of very few pending child fibers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyri-petrou once a career starts with ~15 years of writing C++, you put the nulls first on equality checks 😎

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding #8816, I agree with you, ZIO.foreachPar shouldn't be interfering. But there is a use-case (imho not a valid one) that would be affected if we changed that behaviour). Check the PR description for more info on that

core/shared/src/main/scala/zio/internal/FiberRuntime.scala Outdated Show resolved Hide resolved
streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
.pipeTo(self)
.pipeTo(enqueueCh)
.runScoped
.forkScoped
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to have runScoped with forkScoped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the channel is running in its own fiber which is kept alive with some management code that somehow binds the fiber's lifetime to the scope's lifetime. this is important since the optimized code relies on its spawned fibers to be bound to the channel's lifetime (specifically the enqueuer channel+fiber)

Copy link
Contributor Author

@eyalfa eyalfa May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see this, the channel is executed within its own fiber and this fiber is kept alive until the scope is closed.
all fibers spawned by effects executed by the stream are descendants of this fiber (unless explicitly forked as daemons or explicitly transplanted)

streams/shared/src/main/scala/zio/stream/ZChannel.scala Outdated Show resolved Hide resolved
@@ -2131,4 +2263,11 @@ object ZChannel {
): ZChannel[Env1, InErr, InElem, InDone, OutErr, OutElem, OutDone] =
self.provideSomeEnvironment(_.updateAt(key)(f))
}

private case class QRes[A](value: A)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be an AnyVal?

Copy link
Contributor Author

@eyalfa eyalfa May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QRes is used to distinguish between user specified type (i.e. the Channel's OutElem) we know anything about, and 'control' types like Cause.
I guess it could be dropped if the code was migrated to scala3, using union types, the basic idea is to avoid wrapping the stream elements themselves (as they make the vast majority of messages running through the queue) while wrapping the completion/failure message. this way the reader can do a very simple pattern matching to identify the last (success/fail) message and quickly handle the data elements themselves

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @paulpdaniels was referring to having QRes extend AnyVal to avoid allocation from wrapping, like:

private case class QRes[A](value: A) extends AnyVal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be possible, checking

streams/shared/src/main/scala/zio/stream/ZStream.scala Outdated Show resolved Hide resolved
@kyri-petrou
Copy link
Contributor

@eyalfa #8816 got merged in, you probably want to rebase and check whether everything's still OK

@eyalfa eyalfa requested a review from kyri-petrou May 8, 2024 10:34
ZIO.asyncZIO[Any, Nothing, Chunk[Fiber.Runtime[_, _]]] { k =>
ZIO.succeed {
self.tell {
FiberMessage.Stateful { case (fib, _) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this optimization will cause .children to block if the underlying fiber is blocked. This means that operations like fiber dump will not work properly on Loom.

I would like to eliminate the synchronization on the children set, too, but I'm afraid it is probably going to require a new lock-free set implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u please elaborate on this? under the current implementation the fiber is 'waking up' on queued messages, what's going to happen in loom? r u concerned about scenarios where the fiber is blocking on something like promises? does this mean FiberMessage.Stateful is going away? will FiberRuntime even have a queue like it does today?
furthermore, current implementation relies on FiberMessage.Stateful for common operations like tellAddChild (used by zio.internal.FiberScope.Local#add), so I don't think this specific change introduces any new blocking risk.

seems like going 'loom' will require substantial changes to zio fibers...

// Now await all children to finish:
ZIO
.whileLoop(iterator.hasNext)(body())(_ => ())(id.location)
//find the first operable child fiber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@eyalfa
Copy link
Contributor Author

eyalfa commented May 8, 2024

@jdegoes, will u approve this PR once the children set related changes are reverted (or agreed upon)?
please lmk if/how can this be integrated.

…other fibers, but use the observaton it is only mutated by the fiber itself in order to grnaularily control the required synchronization
@eyalfa
Copy link
Contributor Author

eyalfa commented May 10, 2024

@jdegoes I think I found a middle ground between making the children set 'fiber private' and your concerns about blocking in a futuristic loom based runtime.
relying on the observation that this set is only updated by the fiber itself, the synchronization can be relaxed such that:

  1. updates are synchronized
  2. 'external' reads are synchronized
  3. local reads are not synchronized

since the children method is the only external read, it's the only non mutating method that requires synchronization.

@kyri-petrou I've also applied an optimization on the transferChildren method:

  1. batch the add operations, this allows checking the parent fiber and its interruptibility status just once, also saves enqueued messages when the current fiber is not the parent.
  2. since this method effectively clears the children set, I just null the member variable effectively avoiding any mutation on the set, hence no need to synchronize on it

@eyalfa eyalfa requested a review from jdegoes May 10, 2024 08:35
@eyalfa
Copy link
Contributor Author

eyalfa commented May 10, 2024

@jdegoes @kyri-petrou , I think this is now ready for another round of review

Copy link
Member

@regiskuckaertz regiskuckaertz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is much more readable 😍 I just thought about a few things, questions more or less. Let me know your thoughts.

queueReader = ZChannel.fromInput(input)
queue <- Queue.bounded[Fiber[Option[OutErr1], OutElem2]](bufferSize)
permits <- zio.Semaphore.make(n)
failureSignal <- Promise.make[Option[OutErr1], Nothing]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we currently losing errors with this? if multiple fibers have failed we'd ideally see that in the Cause. But this signal seems only necessary to interrupt running fibers, which would yield a Cause.Interrupt for each of them. So I wonder if you could just turn it into Promise[Nothing, Unit].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what u mean by 'loosing errors', AFAIK this preserves the same behavior as the original implementation, hence in case of failures the stream fails with the first noticeable failure (in case of race, an arbitrary 'winner'). in fact the original impl also used the errorSignal promise both for interrupting 'surviving' fibers and for signaling downstream of the error (the effect in the queue reflects that).

re. changing the failure signal to Promise[Nothing, Unit], this will fail some of the existing unit-tests. basically in case of parallelism with N>1, a case when the first effects is blocking/still running but another effect fails should result with the stream failing with the 'later' failure (with all pending fibers effectively interrupted). this means either the reader or the writers must race with the failure signal, doing this on the reader side is a bit simpler but resulted with a performance penalty (roughly speaking you need to race this per record, better do this in parallel. backed by benchmark numbers).

re. your concerns about the fibers failing with interrupt instead of the actual failure: two cases when fibers are interrupted:

  1. the entire stream is interrupted, I think we can't expect another failure in this scenario
  2. when another fiber fails and manages to complete the failure signal promise, since the executing fibers are all raced with the failure signal, relying on raceWith behavior, I explicitly interrupt the looser and propagate the error from the winning side, so I can't c an interruption winning over an actual failure in this scenario

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i can see errors can be lost in the current implementation too, so it's fine if you don't try to address that. you can see how multiple failing fibers are handled with ZIO.forEachPar here: https://scastie.scala-lang.org/HtsjbCtTSoCoT5UBuKw28Q and if you want to learn more about how this works: https://zio.dev/reference/core/cause#cause-internals

Update: I should think twice before saying something that stupid, of course you can't block emitting elements before all fibers are done 🤦‍♂️

i wonder if the performance hit could be overcome by changing the one element at a time nature of your solution. even with your current implementation you could do:

queue.takeUpTo(bufferSize).flatMap(Fiber.collectAll(_)).map(ZChannel.writeChunk(_))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the SingleProducerAsyncInput constructs seems to have been designed for ZChannel.mergeAllWith, but here there is only one consumer. It wouldn't be too hard to create a SingleConsumerAsyncInput that internalises the queue, that would drastically simplify your implementation. One element at a time processing instead of by chunks is the top reason for poor performance with streams (here is a nice example: https://blog.pierre-ricadat.com/supporting-high-performance-response-streaming-in-shardcake#heading-chunking-express)

restore =>
for {
localScope <- zio.Scope.make
_ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate we don't seem to have access to Semaphore.reserve and Semaphore.release anymore as with them you could ditch that local scope:

restore(permits.reserve(1) *> f(a)...).onExit(_ => permits.release(1)).fork

perhaps we could resurrect them with the unsafe pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also had this thought, we can introduce them as private[zio] or as unsafe operations

localScope <- zio.Scope.make
_ <- restore(permits.withPermitScoped.provideEnvironment(ZEnvironment(localScope)))
fib <- restore {
f(a).catchAllCause { c =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have multiple fibers failing at the same time, so I wonder if we could handle that, but it won't work if you use the promise to carry the error. Ideally in the resulting cause we'd see all the failures and all the interruptions. Maybe you could do this instead?

f(a)
  .onError(_ => failureSignal.succeed(()))
  .onExit(localScope.close(_))
  .fork

and then you only need one race in the consumer:

val left = queue.take.flatMap(_.join)
val right = failureSignal.await
  // grab all the fibers in the queue
  .zipRight(queue.takeAll)
  // interrupt queue producers and consumers if any,
  // actually maybe a bit more complex if `n > bufferSize`
  .zipLeft(queue.shutdown)
  // interrupt all running fibers if any, then collect single cause
  .flatMap(fs => Fiber.interruptAll(fs)
    .zipRight(ZIO.foreach(fs)(_.await))
    // grab all failures
    .map(_.collect { case Exit.Failure(cause) => cause }.reduce(_ && _))
    .map(ZChannel.failCause(_))

left.raceWith(right)(
    // error will be visible in right handling so it can be ignored here
    { case (exit, fiber) =>
      exit.fold(
        _ => ZChannel.unwrap(fiber.await), 
        x => ZChannel.unwrap(right.interrupt.as(ZChannel.write(x)))) },
    { case (exit, fiber) => ZChannel.unwrap(fiber.interrupt *> exit) }
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've answered this on the previous comment:

  1. preserves existing behavior
  2. racing on the consumer was quite costly performance wise
  3. the complexity of this block of code... not to mention the risks it introduces, how many message are you reading from the queue after seeing a failure? how do u make sure all fibers terminate? how do u interrupt the rest?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't rule out based on apparent complexity, in ZIO we have never shied away from writing complex, accurate logic without compromise. The current implementation is 50 lines of very readable code, yours is almost double that amount with more features involved; this is not necessarily a bad thing. I take your point though but i would question the introduction of bufferSize, there is already a backpressure mechanism bounded by n. If you remove it, then there is a guarantee that all fibers are in the queue when takeAll happens.


val reader: ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = {
lazy val reader0: ZChannel[Any, Any, Any, Any, OutErr1, OutElem2, OutDone] = ZChannel
.fromZIO(q.take)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could use takeUpTo(n) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought about it, I think it complicates the code a bit...
but I decided not to do so since it violates the buffer size constraint and may effectively lead to double the amount of buffered elements.

@regiskuckaertz
Copy link
Member

Keep up the good work 💪 these were just my 2 cents, I haven't reviewed in a long time.

if (_children eq null) {
_children = Platform.newConcurrentWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe)
_children = Platform.newWeakSet[Fiber.Runtime[_, _]]()(Unsafe.unsafe)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a big optimization potential here. Currently we're initializing the Set using the default underlying size (size 16, can accommodate 12 entries before resizing).

Since now we can add children in bulk, we therefore know how many entries the set is meant to hold (at least initially). So we can instantiate it with a size large enough to accommodate all the entries without the need to dynamically resize.

Note that to calculate the size based on the number of expected entries you need to use this equation: size = Math.ceil(nEntries / 0.75d)

I would also advice to use 16 as the minimum size as we don't get much benefit sizing it smaller, but can be really bad if we need to keep resizing later (since resizing happens in Pow2 increments)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about this potential, I think it goes beyond that since some of the spawned fibers ae very short lived and may already e finished by the time they're added to the fiber scope.
another thought I had, when forking multiple 'very similar' fibers (foreachParXXX), is it possible to come up with some kind of 'template' for the inherited fiber refs? they are all forked from the same parent so all should have very similar fiber refs except from the few which have a specialized 'fork' logic which perhaps can be pre-identified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, is it mandatory for _children to be a set? the remove operation is currently not used, we also never add the same fiber twice so the collection is effectively guaranteed to be a set.
I suspect an array based impl with 'occasional' GC can be sufficient here

Copy link
Contributor Author

@eyalfa eyalfa May 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kyri-petrou I suspect I may have to restore the synchronized weak set, seems like the set's isEmpty method effectively modifies it.

if there was a way to get the 'naive' set's size we'd still be ok, but given the current situation I think we must restore the synchronized weak set and seek alternative (I still think this doesn't have to be a set at all).

another alternative is to restore the message based children impl and address @jdegoes comment once ZIO actually goes loom.

@jdegoes
Copy link
Member

jdegoes commented May 12, 2024

@eyalfa I would approve without the fiber children set changes.

Under Loom, any ordinary ZIO.succeed(...) or ZIO.attempt(...) can become blocking IO. This means that the runLoop will be legitimately blocked indefinitely pending I/O. During this time, the fiber is not responsive to messages. Because of how many fibers there are (10's of thousands or more) in most applications, it means that there is always at least one (and likely many) that are blocked on I/O.

Such "blocked fibers" which cannot respond to any messages (because the fiber run loop is active) mean that any attempt to generate a full stack trace would block for an indefinite amount of time.

That is definitely not acceptable. I think we need a true concurrent weak set, and that's a separate ticket that I will create. Until then, I don't see an alternative to always synchronizing on the children set.

@eyalfa
Copy link
Contributor Author

eyalfa commented May 12, 2024

@eyalfa I would approve without the fiber children set changes.

Under Loom, any ordinary ZIO.succeed(...) or ZIO.attempt(...) can become blocking IO. This means that the runLoop will be legitimately blocked indefinitely pending I/O. During this time, the fiber is not responsive to messages. Because of how many fibers there are (10's of thousands or more) in most applications, it means that there is always at least one (and likely many) that are blocked on I/O.

Such "blocked fibers" which cannot respond to any messages (because the fiber run loop is active) mean that any attempt to generate a full stack trace would block for an indefinite amount of time.

That is definitely not acceptable. I think we need a true concurrent weak set, and that's a separate ticket that I will create. Until then, I don't see an alternative to always synchronizing on the children set.

thanks @jdegoes , I'll revert the set type but leave the 'addChildrenchanges. must say this is a real bummer as thechildren` functionality is only used for debugging purposes. I'm also not 100% sure how would interrupts look in a fully loom enabled zio, guess we'll see and learn 😎

@jdegoes
Copy link
Member

jdegoes commented May 12, 2024

@eyalfa I already implemented interrupt mapping in a different (discarded) PR, so I know how to do that and I will bring it back in a month or so.

@eyalfa
Copy link
Contributor Author

eyalfa commented May 12, 2024

@jdegoes I've restored the synchronized child fibers set, I'd appreciate another review.

I think the fact the non-synchronized variant doesn't work for us is a bit coincidental, if there was a way to make its read-only method actually not mutating it'd make the granular locking strategy feasible... I think it should be possible to derive a very limited interface (not the fully blown collection's interface) and implement that specifically for this use case.

@eyalfa
Copy link
Contributor Author

eyalfa commented May 15, 2024

@jdegoes I think it's ready for your final review

@jdegoes jdegoes merged commit a6b26fa into zio:series/2.x May 15, 2024
21 checks passed
@jdegoes
Copy link
Member

jdegoes commented May 15, 2024

@eyalfa Thank you for your work here!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants