Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Child fiber interrupted while parent runs onInterrupt #8804

Open
erikvanoosten opened this issue Apr 29, 2024 · 6 comments
Open

Child fiber interrupted while parent runs onInterrupt #8804

erikvanoosten opened this issue Apr 29, 2024 · 6 comments

Comments

@erikvanoosten
Copy link
Contributor

erikvanoosten commented Apr 29, 2024

In zio-kafka we're trying to add clean shutdown. The goal is to complete processing a stream, that is, process it up to the end marker that we insert when interrupted. However, unfortunately the stream sometimes gets interrupted ahead of time as well.

Below is a minimizer. Every few executions the stream is interrupted, shown by the console output:

Stream end sent
Error joining fiber
stream interrupted, this should not happen

While we expect that the console output is always:

Stream end sent
Fiber joined

Our expectation is that the stream is not interrupted until after fib.join.onInterrupt completes.
In other words, we expect that child fibers are not interrupted as long as the parent is running onInterrupt.

If this is expected behavior, is there another way to achieve the same effect?

import zio.*
import zio.stream.*

object ZStreamInterrupt extends ZIOAppDefault {
  override val bootstrap = Runtime.removeDefaultLoggers ++ Runtime.addLogger(ZLogger.simple(println))

  private val shutdownTimeout = 5.seconds

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
    val stream: ZStream[Any, Nothing, Take[Nothing, Long]] =
      ZStream
        .fromSchedule(Schedule.spaced(100.micros))
        .map(Take.single)

    val p = for {
      end <- Promise.make[Nothing, Unit]
      stoppableStream = stream.merge(ZStream.fromZIO(end.await).as(Take.end))
      fib <- stoppableStream
        .flattenTake
        .runDrain
        .onInterrupt(ZIO.logError("stream interrupted, this should not happen"))
        .forkScoped
      result <-
        fib.join.onInterrupt {
          end.succeed(()) *>
            ZIO.logError("Stream end sent") *>
            fib.join
              .timeout(shutdownTimeout)
              .tapErrorCause { cause =>
                ZIO.logErrorCause("Error joining fiber", cause)
              }
              .ignore *>
          ZIO.logError("Fiber joined")
        }
    } yield result

    // Interrupt after 1 second
    p.timeout(1.second)
  }
}
@erikvanoosten
Copy link
Contributor Author

Any of our attempt to minimize this further (not using stream) failed. In other words, it could be that this is a bug/feature of the streams implementation.

@jdegoes
Copy link
Member

jdegoes commented Apr 30, 2024

In ZIO 2, transitive fiber interruption is "instantaneous". This means a great grand-child can be interrupted before a parent finishes its winddown process.

This is different behavior than earlier versions of ZIO. In general, it's a better default, but there are some cases where you might have to add explicit synchronization to prevent this behavior.

In this case, could be a bug in streams (or could be a feature, depending on your point of view). Interested in feedback.

@erikvanoosten
Copy link
Contributor Author

erikvanoosten commented Apr 30, 2024

where you might have to add explicit synchronization to prevent this behavior

A workaround would be great, but we're not sure how. We can't run the stream with uninterruptable because that causes other problems (the stream's finalizers are not run anymore).

Interested in feedback.

My first thought was that since we're explicitly using .onInterrupt this could be a signal that child processes should not be interrupted yet. I am yet to find a serious idea where this would go wrong (something where inside the .onInterrupt we would wait for child fibers to be interrupted?), so perhaps this could be a good solution.

Another thought is that we could introduce an interrupt strategy that governs how child fibers are interrupted. For example, a zio might get method .withChildFiberInterrupts(Boolean). When false is given, any fiber that is spawned while in the scope of that zio would not get interrupted when its parent gets interrupted.

@erikvanoosten
Copy link
Contributor Author

erikvanoosten commented May 8, 2024

Interesting, under ZIO 2.1 (2.1.0-RC5) the problem does not seem to occur.

Does ZIO 2.1 not interrupt all fibers at once like ZIO 2.0 does?

@kyri-petrou
Copy link
Contributor

@erikvanoosten, would it be possible to test whether the issue is there with version 2.1-RC1? I changed a bunch of things after RC1 and want to know whether it's something I did or was done previously

@erikvanoosten
Copy link
Contributor Author

@kyri-petrou 2.1-RC1 also has the expected behavior.

BTW, I am testing this with the app listed above, but with the last line replaced by:

p.timeout(100.millis).repeatN(100000)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants