Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interruptWhen and friends shouldn't cancel downstream processing #3202

Open
Jasper-M opened this issue Apr 7, 2023 · 7 comments
Open

interruptWhen and friends shouldn't cancel downstream processing #3202

Jasper-M opened this issue Apr 7, 2023 · 7 comments
Labels

Comments

@Jasper-M
Copy link
Contributor

Jasper-M commented Apr 7, 2023

Stream(1,2).covary[IO].interruptAfter(200.millis).evalMap( i =>
  IO.println(i).delayBy(300.millis).onCancel(IO.println("CANCELED"))
).compile.drain.unsafeRunSync()

This program prints CANCELED even though this is happening downstream from interruptAfter.
Only canceling upstream would give more control to the clients of this stream.

@Jasper-M Jasper-M added the bug label Apr 7, 2023
@Jasper-M Jasper-M changed the title interruptWhen shouldn't cancel downstream processing interruptWhen and friends shouldn't cancel downstream processing Apr 7, 2023
@diesalbla
Copy link
Contributor

diesalbla commented Jul 10, 2023

@Jasper-M Good morning. Thanks for reaching out. I am sorry that we have not taken a look at this for a while.

Regarding the issue, I think that it is working as expected. The evalMap is just defined as a combination of flatMap and Stream.eval. The semantics of flatMap is that it is building a composed stream, a source of data, based on a previous stream. If that stream is interrupted (told to cut off), then the flatMap stream must too. Interrupting the stream implies cancelling the F actions it has started.

To obtain the desired behavior, that the action would not be cancelled from outside once started, that can be done using the uncancelable method of IO or cats-effect.

Stream(1,2).covary[IO].interruptAfter(200.millis).evalMap( i =>
  IO.println(i).delayBy(300.millis).onCancel(IO.println("CANCELED")).uncancelable)
).compile.drain.unsafeRunSync()

@armanbilge
Copy link
Member

If that stream is interrupted (told to cut off), then the flatMap stream must too.

I miss this part. Why must the flatMap stream be interrupted as well vs. close gracefully? e.g. continue processing remaining elements from upstream.

The confusion here is that it seems that interruption applies to the entire stream, rather than specifically to the upstream. And maybe I'm missing it, but it's not obvious to me why this is fundamental. It does seem fundamental that when an upstream is interrupted, it will no longer emit elements, but why this forces the downstream to cancel processing of already-emitted elements is not clear.

@diesalbla
Copy link
Contributor

diesalbla commented Jul 10, 2023

@armanbilge That is a good question that is not documented. Here is a vague guess, that I would like @SystemFw or @mpilquist to confirm:L

Stream subsumes Resource. The time a stream is emitting elements is the lifetime of any resource acquired during that stream. Whenever you do seed.flatMap(bloom), each application of bloom is building a stream whose resource scope is a child-scope of that of the seed. For example:

  for {
/*a*/    r <- Stream.resource(myResource).interruptAfter(2 seconds)
/*b*/    x <- streamThatEmitsOutputs
/*c*/    _ <- r.doActionWith(x)
  } yield ()

Once the resource-acquiring stream in a is interrupted, it triggers resource cleanup, for which it has to cancel the use in c.

@armanbilge
Copy link
Member

Thanks Diego! That was a nice explanation; it makes sense to me.

So in this case, "interrupt" not only means stop emitting elements, but also immediately release resources (even if this requires canceling ongoing users).

@Jasper-M
Copy link
Contributor Author

Stream subsumes Resource.

Resource doesn't require the semantics as described here though. Because Resource only allows cleaning up the resource when the use action stops, not cleaning up the use action when the resource stops.

But I do understand why you would like to have these "extended" resource semantics. In fact you probably want these semantics in almost all cases where you use Resource even though you don't actually get them...

So let's assume that we want these semantics and that the interrupt* methods behaving like they do is the way to get them. In that case perhaps we need another set of methods, e.g. takeByTimespan and takeUntil with the same signatures as interruptAfter and interruptWhen but with the difference that they only stop pulling and don't interrupt the downstream. After all if I have Stream.resource(myResource).take(1) (or even just Stream.resource(myResource)...) then that stream also "stops emitting elements" but that doesn't kill the scopes of the already emitted element(s).

@SystemFw
Copy link
Collaborator

SystemFw commented Aug 1, 2023

A relevant discussion popped up on Discord, copying it here because I think there is useful insight.


@SystemFw:
Since this discussion pops up periodically, here's your periodic reminder that there is a massive difference between stream interruption and io interruption, since stream interruption can be resumed
And for this reason, I also don't really agree that stream interruption is not about cancelling ongoing effects:

Stream
.repeatEval(action)
.metered(8.minutes)
.interruptAfter(1.hour) ++ Stream.repeatEval(action2)

ofc, you can make a case for always having control flow in IO and have a streaming library purely for streaming data, but that's a different library in my mind.


@Jasper-M:

Stream interruption cancelling ongoing effects is perfectly fine IMO, but why should logActionResult get cancelled in this case?

Stream
.repeatEval(action)
.metered(8.minutes)
.interruptAfter(1.hour)
.evalMap(logActionResult) ++ Stream.repeatEval(action2)

Especially since that interrupt could be hidden behind some interface

def runAnActionFor1Hour: Stream[IO, ActionResult] =
  Stream
  .repeatEval(action)
  .metered(8.minutes)
  .interruptAfter(1.hour)

runAnActionFor1Hour.evalMap(logActionResult) ++ Stream.repeatEval(action2)

IMHO wanting to cancel logActionResult is the corner case, specific for those instances where your stream is actually a resource. But in the general case you don't want that.


@SystemFw
Well, it might be because I'm very familiar with it, but to me that code behaves correctly in interrupting logActionResult. Each stream, as delimited by ++, defines a "process", and you're saying that that process should stop after 1.hour. It may seem strange that you are able to control future consumers, so to speak, but that's exactly the ability that makes Stream more powerful than Resource, for example it's what makes concurrently capable of surfacing background errors to the foreground, whereas Resource.background can only do the reverse.

If you want graceful stopping semantics, there are other tools: you could merge with a timeout Stream and use takeWhile, for example.

In that case perhaps we need another set of methods, e.g. takeByTimespan and takeUntil with the same signatures as interruptAfter and interruptWhen but with the difference that they only stop pulling and don't interrupt the downstream

If we think this type of semantics is commonly useful, I wouldn't be opposed to facilitating it with more combinators
I'm going to copy some of this discussion in the issue @Jasper-M (quoting you), it seems useful context

@Jasper-M
Copy link
Contributor Author

Jasper-M commented Aug 1, 2023

you could merge with a timeout Stream and use takeWhile, for example.

That sounds like it wouldn't cancel the upstream processing either (action in our previous example). I think that ideally a takeByTimespan combinator would still do a best effort to cancel all upstream processing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants