Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errors are not always caught with .handleErrorWith #3045

Open
nikiforo opened this issue Nov 7, 2022 · 11 comments
Open

Errors are not always caught with .handleErrorWith #3045

nikiforo opened this issue Nov 7, 2022 · 11 comments
Labels

Comments

@nikiforo
Copy link
Contributor

nikiforo commented Nov 7, 2022

Take a look at https://github.com/nikiforo/fs21107

The project can be run with sbt "test:runMain ru.delimobil.FS2Example"

I expect the execution to continue inifinitely. However it ends with Errored(java.lang.IllegalArgumentException: TheException) in tens seconds.

I can further minimize the example. I've checked fs2 3.2.4, 3.3.0, both versions end with Errrored message

@nikiforo nikiforo added the bug label Nov 7, 2022
@armanbilge
Copy link
Member

Here's the minimizer with scala-cli.

//> using scala "3.2.1"
//> using lib "co.fs2::fs2-core::3.3.0"

package ru.delimobil

import cats.MonadThrow
import cats.effect.IO
import cats.effect.IOApp
import cats.syntax.applicative._
import cats.syntax.option._
import fs2.Stream

import scala.concurrent.duration._

object FS2Example extends IOApp.Simple {

  case class MaybeInt(int: Option[Int])

  private val empty: MaybeInt = MaybeInt(none)

  private val big = 2

  private val small = 1

  private val track = empty.copy(int = big.some)

  def run: IO[Unit] = {
    val singleRun =
      Stream(track, track)
        .repeat
        .through(assemble)
        .evalTap(checkDateValidity)
        .parEvalMap(10)(_ => IO.unit)
        .handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}")))
        .groupWithin(512, 1.second)
        .compile
        .drain

    Stream.repeatEval(singleRun).compile.drain.guaranteeCase(IO.println)
  }

  private def assemble(tracks: Stream[IO, MaybeInt]): Stream[IO, MaybeInt] =
    (Stream(empty) ++ tracks)
      .sliding(2)
      .map { chunk => MaybeInt(chunk.head.get.int) }
      .chunks
      .flatMap(chunk => Stream.fromOption(chunk.last))

  private def checkDateValidity(updateAction: MaybeInt): IO[Unit] = {
    val action = MonadThrow[IO].raiseError(new IllegalArgumentException("TheException"))
    action.whenA(updateAction.int.exists(_ > small))
  }
}

@scott-thomson239
Copy link

If no one else is currently having a look at this, can I try and take this on?

@armanbilge
Copy link
Member

Go for it! I played with a little bit this morning. All I discovered was that removing the parEvalMap or the groupWithin seemed to fix it. If I remember correctly.

@nikiforo
Copy link
Contributor Author

nikiforo commented Nov 8, 2022

I think I excluded parEvalMap

val err = new IllegalArgumentException("TheException")

def run: IO[Unit] = {
  val singleRun =
    Stream(()).concurrently((Stream(()) ++ Stream.raiseError[IO](err)).repeat)
      .handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}")))
      .groupWithin(512, 1.second)

  Stream.repeatEval(singleRun.compile.drain).compile.drain.guaranteeCase(IO.println)
}

@nikiforo
Copy link
Contributor Author

nikiforo commented Nov 8, 2022

The exception persists in 3.3.0, 3.2.4, 3.0.0, 2.5.4, 2.5.0, 2.1.0. But doesn't appear in 2.0.0.
Git bisect might help to understand the cause of the error.

UPD: I also checked 2.2.0, 2.3.0, 2.4.0, 3.1.0; the error persists in that versions

@nikiforo
Copy link
Contributor Author

nikiforo commented Nov 8, 2022

This is CE2 compliant code

  def run(args: List[String]): IO[ExitCode] = {
    val singleRun =
      Stream(()).concurrently((Stream(()) ++ Stream.raiseError[IO](err)).repeat)
        .handleErrorWith(ex => Stream.eval_(IO.delay(println(s"Caught ${ex.getMessage}"))))
        .groupWithin(512, 1.second)

    Stream.repeatEval(singleRun.compile.drain).compile.drain.guaranteeCase(c => IO.delay(println(c))).map(_ => ExitCode.Success)
  }

@nikiforo
Copy link
Contributor Author

nikiforo commented Nov 8, 2022

I bisected 2.1.0 ... 2.0.0

659791b0fd3eef85824fbff39f8280e9931281f8 is the first bad commit
commit 659791b0fd3eef85824fbff39f8280e9931281f8
Author: Diego E. Alonso-Blas <diesalbla@gmail.com>
Date:   Sat Sep 21 01:55:06 2019 +0100

    Solves memory Leak
    
    Reverts changes in commit 36b1279cda415ebe8bb3faa33cb9aa3e6c6697d9.
    
    This commit changed the Compilation Loop was modified in a recent PR
    to use a continuation and a stack of continuations. However, this
    introduced a memory leak that could be seen running a simple:
    `Stream.range(0, 2800934).fold(0L)(_ + _).compile.last`,
    as described in Issue 1625.
    
    This commit reverts those commits, and thus fixes that memory leak.

:040000 040000 aadfd4614a28b579a24d0faef1c7af8fc3f394d8 e59aad789baf548716d094f07aaba388029dd02f M	core

That's the link to the commit: 659791b

@nikiforo nikiforo changed the title Errors are not always catched with .handleErrorWith Errors are not always caught with .handleErrorWith Nov 8, 2022
@scott-thomson239
Copy link

scott-thomson239 commented Nov 27, 2022

Sorry this is taking so long, I haven't had much luck on the little time I have had available to look at this. For just now I will unassign myself so other people can have a go as well.

@scott-thomson239 scott-thomson239 removed their assignment Nov 27, 2022
@diesalbla
Copy link
Contributor

diesalbla commented Nov 27, 2022

@nikiforo Some small minimisations: the .repeat of the background process is not needed. This seems correct, since the first failure should cut the execution. Furthermore, the first part of the stream is also not needed. So instead of Stream(()).concurrently((Stream(()) ++ Stream.raiseError[IO](err)).repeat), we can also reproduce the bug with just: Stream(()).concurrently(Stream.raiseError[IO](err)). On the other hand, replacing the front-ground stream Stream() with Stream.empty prevents the error, apparently by interrupting the background execution of the error stream before it can starts.

(this was based on the master branch of https://github.com/nikiforo/fs21107)

@armanbilge
Copy link
Member

we can also reproduce the bug with just

@diesalbla would you mind posting a complete reproducer? I tried as you said and it does not reproducer for me: the error is caught.

//> using lib "co.fs2::fs2-core::3.4.0"

import cats.effect._
import fs2._

object App extends IOApp.Simple {
  def run = Stream(())
    .concurrently(Stream.raiseError[IO](new Exception("ruh roh")))
    .handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}")))
    .compile
    .drain
}

@diesalbla
Copy link
Contributor

diesalbla commented Nov 28, 2022

@armanbilge Here is the modified code:

  object err extends IllegalArgumentException("TheException") with NoStackTrace

  val bullet: Stream[IO, Unit] = Stream.raiseError[IO](err)

  def handler(ex: Throwable) = Stream.exec(IO.println(s"Caught TheException"))

  def run: IO[Unit] = {
    val singleRun =
      Stream(()).concurrently(bullet)
        .handleErrorWith(handler)
        .groupWithin(100, 100.millis)

    def aux: IO[Unit] = singleRun.compile.drain >> aux

    aux.guaranteeCase(IO.println)
  }

This was done using fs2 version 3.4.0. Here are the details of the JVM I used:

fs21107 % java -version
java version "19" 2022-09-20
Java(TM) SE Runtime Environment (build 19+36-2238)
Java HotSpot(TM) 64-Bit Server VM (build 19+36-2238, mixed mode, sharing)

EDIT As a further minimisation, we can write outer loop without streams, using recursion in IO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants