Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polymorphic async/await implementation #1924

Merged
merged 20 commits into from May 31, 2021

Conversation

Baccata
Copy link
Contributor

@Baccata Baccata commented Apr 22, 2021

This preliminary work adds an async/await implementation based off the now built-in mechanism in the Scala 2 compiler.

The grittiest details of the implementation are borrowed from :

Due to the reliance on Dispatcher#unsafeRunSync, the implementation currently only works on JVM.

Error propagation / cancellation seems to behave as it should.

NB : despite it compiling, using this with OptionT/EitherT/IorT is currently unsafe, for two reasons:

I've introduced a construct to deal with side-error-channels in what I currently believe is a safe manner.

Fixes #1907

This preliminary work adds an async/await implementation based off
the now built-in mechanism in the Scala 2 compiler.

The grittiest details of the implementation are borrowed from :

* scala/scala#8816
* https://github.com/retronym/monad-ui/tree/master/src/main/scala/monadui
* https://github.com/scala/scala-async

Due to the reliance on Dispatcher#unsafeRunSync, the implementation
currently only works on JVM.

Error propagation / cancellation seems to behave as it should.

NB : it is worth noting that despite it compiling, using this with
OptionT/EitherT/IorT is currently unsafe, for two reasons:

* what seems to be a bug in the MonadCancel instance tied to those
Error-able types: See https://gitter.im/typelevel/cats-effect-dev?at=60818cf4ae90f3684098c042
* The fact that calling `unsafeRunSync` is `F[A] => A`, which obviously
doesn't work for types that have an error channel that isn't accounted
for by the CE typeclasses.

"IOAsyncAwait" should {
object IOAsyncAwait extends cats.effect.std.AsyncAwaitDsl[IO]
import IOAsyncAwait.{await => ioAwait, _}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await collides with something in FutureMatchers.

@Baccata Baccata changed the title Polymorphic AsyncAwait implementation Polymorphic async/await implementation Apr 22, 2021
@Baccata
Copy link
Contributor Author

Baccata commented Apr 22, 2021

I haven't added a syntax object for IO in the main scope yet. If someone could make an executive decision over where to put things and what to call them, that'd be great.

The current UX (as you can see in tests) looks like this:

object IOAsyncAwait extends cats.effect.std.AsyncAwaitDsl[IO]
import IOAsyncAwait._ 

async(await(IO.println("hello"))) 

@vasilmkd
Copy link
Member

Since we're all very new to this, may I suggest that we mark this as experimental and iterate on this for a couple of versions without promising source or binary compatibility. Who knows, we may end up improving on some of the constraints. Hats off to @Baccata for the effort. I'm interested to look into this in more depth, I've never dabbled with macros before.

@djspiewak
Copy link
Member

Since we're all very new to this, may I suggest that we mark this as experimental and iterate on this for a couple of versions without promising source or binary compatibility. Who knows, we may end up improving on some of the constraints. Hats off to @Baccata for the effort. I'm interested to look into this in more depth, I've never dabbled with macros before.

I agree with this. It's worth noting though that we basically already have a built-in "experimental" flag on all of this, since none of it is accessible without using the -Xasync flag on scalac. So overall I think we're okay. :-)

djspiewak and others added 3 commits April 23, 2021 16:18
This aims at solving the problem of OptionT/EitherT/IorT traversing
dispatchers
* Leveraged DispatchCloak in the async/await implementation
* Added tests to show that it works for OptionT
@Baccata
Copy link
Contributor Author

Baccata commented Apr 24, 2021

@vasilmkd, @djspiewak, my latest commits on this introduce a construct (that I named DispatchCloak, feel free to propose an alternative) that solves the following problems :

  • prevent reliance on unsafeRunSync for javascript interop (btw, why doesn't the build reference shared/scala-2 as an unmanagedSourceDirectory ? Should I add it manually ?)
  • allows to account for side-error-channels (which as far as I know is the reason why successful outcomes expose an F[A]), by encoding these channels as ad-hoc, path-dependant Throwable classes (hence the "cloak" name). In the bigger picture, this would theoretically allow for Dispatcher#unsafeRunSync to be called "safely" (ie without hanging) on monad transformer stacks that have such error channels. I was thinking it felt a bit ad-hoc at first, but it does embrace the same "Throwable is the de-facto channel" paradigm as Dispatcher does.

@vasilmkd
Copy link
Member

vasilmkd commented Apr 24, 2021

Just to confirm, you're still currently hitting the problems with transformers that we became aware of a couple days ago right? I just want to make sure this is not somehow hiding those or relying on behavior that will be changed/fixed in the next version.

Btw, I'm not sure about the scala-2 source directory, you're going to have to fiddle around with it unfortunately. 😕

@Baccata
Copy link
Contributor Author

Baccata commented Apr 24, 2021

Just to confirm, you're still currently hitting the problems with transformers that we became aware of a couple days ago right? I just want to make sure this is not somehow hiding those or relying on behavior that will be changed/fixed in the next version.

Let's put it this way : the "DispatchCloak" construct is solving two problems by applying a solution which coincidentally prevents the problematic codepath (the one we became aware of) from being exercised.

So even when the MonadCancel#bracketFull implementation is fixed for monad transformers, this solution (or something similar) would still have to be applied, in order to :

  • avoid reliance on Dispatcher#unsafeRunSync which doesn't exist in javascript world
  • communicate non-Throwable errors through the async/await dispatcher-loop (the scala one, not the CE one), because that loops relies on what is analogous to F[A] => A, which even CE's dispatcher cannot provide safely for OptionT/EitherT and the likes.

In other words, the fixing of MonadCancel#bracketFull would not have been enough to prevent async/await from hanging when it encounters OptionT.none or EitherT.left

@vasilmkd
Copy link
Member

I'm very very happy with this outcome.

@Baccata
Copy link
Contributor Author

Baccata commented Apr 24, 2021

I'm very very happy with this outcome.

Wait, actually I'm very dumb 🤦 see here. Yeah it'll need to be rewritten in terms of Deferred + guarantee ...

@vasilmkd
Copy link
Member

We'll fix the issue, that's definitely a priority for the next release.

@djspiewak
Copy link
Member

Spent some time thinking about this today. Do you think it would be better to just define this for IO for the time being? I'm pretty leery about DispatchCloak, but at the same time, just deadlocking whenever someone uses a transformer with alternative zeros is… undesirable.

I'm interested in @retronym's perspective on this. It seems that the current -Xasync mechanism is just not capable of handling any monadic context more complicated than errors and callbacks (which, staring at the API, is actually kind of obvious). It should be possible to get this kind of functionality, but it would require a more complex state machine which no longer makes assumptions about the kinds of state which needs to be threaded between awaits.

@Baccata
Copy link
Contributor Author

Baccata commented Apr 24, 2021

Do you think it would be better to just define this for IO for the time being

Considering it works for Kleisli / WriterT, I think it'd be silly not to provide the feature for a combination of those types. Or to prevent other IO implementations from tapping into this mechanism.

It should be possible to get this kind of functionality, but it would require a more complex state machine which no longer makes assumptions about the kinds of state which needs to be threaded between awaits.

TBH, the state machine makes few assumptions. The problem can be summarised as this : "how do you generically decide whether a computation has been successful, according to all its monadic effects ?" Outside of the -XAsync mechanism, CE3 does suffer from not having a solution to this very problem : following program will happily compile and hang forever.

import cats.effect.IOApp
import cats.effect.IO
import cats.data.OptionT
import cats.effect.kernel.Sync
import cats.effect.std.Dispatcher

object Main extends IOApp.Simple {

  type Foo[A] = OptionT[IO, A]

  def run = {
    IO.println("Hello") >>
      Dispatcher[Foo].use { dispatcher =>
        Sync[Foo].delay(dispatcher.unsafeRunSync(OptionT.none[IO, Int]))
      }.value >> IO.println("Goodbye")
  }
}

I'm pretty leery about DispatchCloak

In the case of Async/Await, a more elegant encoding could be applied: The call to CE's async_ could be made against Either[F[A], A] instead of A, so the CE callback would be Either[Throwable, Either[F[A], A]] => Unit, following the steps of Outcome, and that might be more acceptable than trying to encode stuff in terms of Throwable. However, a typeclass-like construct will still be needed to encode the F[A] => F[Either[F[A], A]].

The value of encoding things in terms of Throwable (even if the current DispatchCloak construct could do with polishing) is that it offers a start of solution for the very present problem showcased in the snippet above. Throwables are ubiquitous in all runtimes Scala compiles against, and a lot of libraries ,that people are gonna use Dispatcher to interop, with will likely propagate throwables.

To summarise, the possible solutions, as I see them are :

  1. Hardcode cats.effect.IO. It's the solution that I'd find the most disappointing.
  2. Come up with a no-method marker typeclass ("awaitable" or whatever) that'd encode the fact that a type can be safely used through things like async/await and Dispatcher#unsafeRunSync
  3. Encode side-error-channels with Throwable types (current state of the PR at the time of writing this), which might be meh but would allow to make Dispatcher#unsafeRunSync work for EitherT/OptionT.
  4. Come up with a F[A] => F[Either[F[A], A]] typeclass which encodes the notion of "can I continue in the monadic sequence ?" (I guess it's conceptually similar to Free#resume now that I think about it), that'd solve the problem for Async/Await, but does not offer a solution to Dispatcher#unsafeRunSync... Or does it ?

@Baccata
Copy link
Contributor Author

Baccata commented Apr 25, 2021

Considering it works for [...] WriterT

That was a stretch, my bad. It does work for Kleisli, but in the case of WriterT, the logs get lost through in the async/await loop through each callback. I'm gonna try out something to address it (in a less ad-hoc fashion than the DispatchCloak thing).

This allows Async/Await to work with any monadic effect.
@Baccata
Copy link
Contributor Author

Baccata commented Apr 25, 2021

@djspiewak, I've nuked the "DispatchCloak" in favour of a much less ad-hoc "Resume" construct that allows for decomposing a monadic computation into its potential product and coproduct components.

Using a mutable variable in the async/await state machine (which I presume is thread-safe since the dispatch loop requires the FSM to hold a mutable integer, but it'd be good to have @retronym's confirmation), the intermediate monadic results can be stored when the async/await loop retrieves the value to fulfil the current "await", and re-composed with the effect indexed by the next "await" call.

The CE async_ call is now performed against F[A], following's the design of Outcome, to account for any monadic effects unknown to the CE typeclasses (as well as potential self-cancellation of await blocks)

protected def tryGet(tr: AsyncAwaitDsl.ResumeOutcome[F]): AnyRef =
tr match {
case Right((newEffect, value)) =>
recordedEffect = newEffect
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to inline some comments there.

Comment on lines 146 to 150
resume.resume((recordedEffect *> f)).start.flatMap(_.join).flatMap {
case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]])))
case Errored(e) => F.delay(this(Left(F.raiseError(e))))
case Succeeded(resumed) => resumed.flatMap(r => F.delay(this(r)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I spent a ton of time thinking about this, and it occurs to me that we should be able to get this same kind of "peak into the control layer" effect by using guaranteeCase. In particular, the Succeeded case should contain the F[A] even when that F[A] is some coproduct case. Unfortunately, this is precisely what isn't working right now because of the bracketCase issue. I think if I fix that issue, then we don't need Resume here anymore.

Copy link
Contributor Author

@Baccata Baccata May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if I fix that issue, then we don't need Resume here anymore.

I don't think that'll suffice, unfortunately, though I'd be really happy to be proven wrong. One thing that the structural interface (which XAsync works with) requires is the implementation of the tryGet method, which, if you squint really hard, is akin to F[A] => Either[Stop, A] (the Stop is signalled by returning this ... ).

For any await block, the state machine will call this tryGet method to unpack the value from whatever structure you elect to work with, and substitutes the await block with that value. So the structure you choose needs to contain enough information to decide whether to signal to the state machine whether it should stop, or continue.

So as far as I'm aware, XAsync cannot be implemented in a polymorphic manner without some abstraction letting you decide whether your F[A] actually "contains" the A. Fixing guaranteeCase will not give us that, it will however mean that resume could be called later than it currently is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not across the particular context of this PR but wanted to mention that the structural interface of async does support using an identity type alias as the "Either" type.

This makes sense if the Future-ish data type + dispatcher are responsible for propagating failures to dependant tasks. That isn't the case with Scala Futures, but is true in some other task abstractions, e.g.:

trait NodeStateMachine extends AsyncStateMachine[Node[AnyRef], AnyRef] with (AnyRef => Unit) {
  def node: Node[AnyRef]
  private[this] var state$async: Int = 0

  protected def state: Int = state$async

  protected def state_=(s: Int): Unit = state$async = s

  final def apply(tr$async: R[AnyRef]): Unit = fsm(tr$async)
  def fsm(tr$async: R[AnyRef]): Unit

  type F[A] = Node[A]
  type R[A] = A

  // Adapter methods
  protected def completeFailure(t: Throwable): Unit = throw new UnsupportedOperationException()

  protected def completeSuccess(value: AnyRef): Unit = node.completeWithValue(value)

  protected def onComplete(f: F[AnyRef]): Unit = f.onComplete(this)

  protected def getCompleted(f: F[AnyRef]): R[AnyRef] = f.getCompleted

  protected def tryGet(tr: R[AnyRef]): tr.type = tr
}

sealed class Node[T] {
  private[this] val ec: Executor = Node.DefaultThreadPool
  private[this] final val result: CompletableFuture[T @uncheckedVariance] = new CompletableFuture()

  def address: NodeAddress[T] = NodeAddress.NoAddress

  protected def runAsync(f: () => Unit): Unit = {
    val childContext = NodeContext.childContext(this)
    CompletableFuture.runAsync ({ () =>
      NodeContext.pushContext(childContext)
      try f()
      catch {
        case t: Throwable =>
          completeWithFailure(t)
      } finally NodeContext.popContext()
    }, ec)
  }
  def onComplete[U](f: T => U): Unit = {
    val childContext = NodeContext.childContext(this)
    result.whenCompleteAsync((t: T, throwable: Throwable) => {
      NodeContext.pushContext(childContext)
      try {
        if (throwable != null) {
          completeWithFailure(throwable)
        } else {
          f(t)
        }
      } catch {
        case t: Throwable =>
          completeWithFailure(t)
      } finally NodeContext.popContext()
    }, ec)
  }

  def completeWithValue(t: T): Unit = result.complete(t)
  def completeWithFailure(t: Throwable): Unit = {
    result.completeExceptionally(t)
    var context = NodeContext.current()
    while (context ne RootContext) {
      context.node.completeWithFailureDirect(t)
      context = context.parent
    }
  }
  ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing guaranteeCase will not give us that, it will however mean that resume could be called later than it currently is.

It might if we cheat. So some things we can hang our hat on:

  • guaranteeCase will run regardless of whether or not we "contain" the A. For example, an OptionT.empty will result in a Succeeded with None. This is what the bracketCase fix buys us
  • We can peak into the result that's inside the Succeeded by flatMap and Dispatcher
  • The result, in practice, is going to be determined by F.pure for some F, meaning that we can safely unsafeRunSync on it. 😈
  • This means that we can figure out whether or not there's an A by simply flatMapping with something that captures that A and re-pures it into F in a local var
  • If the var isn't populated by the unsafeRunSync, then we know we're in a coproduct case, so we populate a different var with… exactly the F[A] we just ran. Then short-circuit the whole rest of the expression and return it, casted to the appropriate type

I'm pretty sure this works in all cases that the control layer solution (Resume) would have worked in.

Copy link
Contributor Author

@Baccata Baccata May 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F.pure for some F, meaning that we can safely unsafeRunSync on it

That is not quite accurate... Funnily enough, that's what I was doing in the first iteration.

If the var isn't populated by the unsafeRunSync, then we know we're in a coproduct case

What you seem to forget is that the behaviour of running unsafeRunSync when you're in the coproduct case is non-termination ... what you suggest would be possible if there was a flavour of unsafeRun that returned Either[F[A], A] or something, which is partially what I've tried to achieve with Resume.

Here's a repro of such non-termination.

import cats.effect.IOApp
import cats.effect.IO
import cats.effect.std._
import cats.data.OptionT
import cats.effect.kernel.Sync

object Main extends IOApp.Simple {

  type Foo[A] = OptionT[IO, A]

  // this could be returned in the success case of `guaranteeCase`
  val prog: Foo[Int] = OptionT[IO, Int](IO.pure(None))

  def run: IO[Unit] = {
    val dispatched =
      Dispatcher[Foo].use(d => Sync[Foo].delay(d.unsafeRunSync(prog))).value

    dispatched >> IO.println("completed")
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might work :) . I'll try it out when guaranteeCase is fixed for transformers. Is there an open issue for that btw ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there isn't an open issue yet. Would you mind opening one if you have the example immediately at hand? I'm drowning in a couple branches right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djspiewak the mutation trick works 👍 , and the guaranteeCase fix isn't blocking this (I think forking a fiber before dispatch is still desirable to avoid the non-termination problem, though in the case of unsafeRunAndForget I do not know whether a thread would end up being monopolised in the background).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fibers are forked regardless when you do an unsafeRun of any sort on Dispatcher, so the secondary forking is unneeded. I do see how it's working around the bracket issue though.

Allows to decide whether we're in a happy path or not, without knowing
anything about the underlying monad. Knowning whether we're still in
the happy path is crucial to signal to the Async/Await state machine
whether it should continue or have the async block yield.
Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving a more proper review now that we're in the home stretch! I'm really excited about this, tbh.

Comment on lines 61 to 62
type CallbackTarget[F[_]] = F[AnyRef]
type Callback[F[_]] = Either[Throwable, CallbackTarget[F]] => Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be public?

Copy link
Contributor Author

@Baccata Baccata May 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, because it's used in macro expansion, so the client code needs to have visibility over those types.

These aliases have been really valuable during iteration as otherwise, any change to the types lead to having to amend the quasiquote expression in several places which is really, really annoying due to not getting compile errors until the macro expansion. I'd rather keep them for maintainability reasons but will obviously abide if you make an executive call to remove them.

In the meantime, I removed CallbackTarget as it's useless in this iteration and renamed Callback to AwaitCallback (8971556)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm cool with keeping them. Just wanted to sanity check.

std/shared/src/main/scala-2/AsyncAwait.scala Show resolved Hide resolved
import cats.effect.kernel.Outcome.Errored
import cats.effect.kernel.Outcome.Succeeded

class AsyncAwaitDsl[F[_]](implicit F: Async[F]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some scaladoc being like "uh, this is really really unstable, only works on Scala 2.x, and will likely change and/or be deprecated in the future"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it.

@@ -0,0 +1,189 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing, but can we rename this file in lower-case to reflect the fact that it contains multiple top-level members? Like asyncAwait.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +29 to +30
object IOAsyncAwait extends cats.effect.std.AsyncAwaitDsl[IO]
import IOAsyncAwait.{await => ioAwait, _}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we're whitebox macroing here, I wonder if it's possible to have an API which doesn't require this kind of "dsl partial application" pattern. I'm totally fine going with this for now, especially since we aren't guaranteeing compatibility, but it feels possible.

Copy link
Contributor Author

@Baccata Baccata May 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turned into a blackbox macro in 259b723.

What you suggest might be possible but it would require a lot more metaprogramming skills than what I can offer. Like, you'd have to collect all the await blocks (which would then be typed def await[F[_], A](fa: F[A]) : A which would likely mess with inference), extract the F from each of them, ensure they're all consistent, implicitly resolve the Async typeclass instance using macro magic ...

Looking at this library, which provides the same feature for Scala 3, the UX is bound to improve in the future, but in the current state, I think "dsl partial application" is the least bad alternative. People who want to use async/await are likely gonna be coding against a specific effect (as opposed to an abstract one) and will be able to define a global instance of the dsl for their effect, after which type inference is solid.

In the meantime, should I add partially applied instance for the specific case of IO ? If so, can you suggest a location / name ?


val program = for {
ref <- Ref[IO].of(0)
_ <- async { ioAwait(IO.sleep(100.millis) *> ref.update(_ + 1)) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we got a little fancier?

Suggested change
_ <- async { ioAwait(IO.sleep(100.millis) *> ref.update(_ + 1)) }
_ <- async {
ioAwait(IO.never)
ioAwait(ref.update(_ + 1))
}

This should also make the test more deterministic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 99 to 100
_ <- program
after <- IO(x must beEqualTo(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also worth testing this sequenced a second time, ensuring that x must beEqualTo(2) afterward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before <- IO(x must beEqualTo(0))
_ <- program
after <- IO(x must beEqualTo(1))
} yield before && after
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} yield before && after
} yield ok

You already did the assertion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this magic ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this magic ?

LOL. The must syntax throws, so you don't really need the return results.

// as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums,
// such as OptionT, EitherT, ...
var awaitedValue: Option[AnyRef] = None
(summary *> f).flatTap(r => F.delay{awaitedValue = Some(r)}).start.flatMap(_.join).flatMap {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually surprises me that scalafmt gives this line a pass.

Suggested change
(summary *> f).flatTap(r => F.delay{awaitedValue = Some(r)}).start.flatMap(_.join).flatMap {
(summary *> f).flatTap(r => F.delay { awaitedValue = Some(r) }).start.flatMap(_.join).flatMap {

Also, why is the .start.flatMap(_.join) necessary? Why not just guaranteeCase? (is this just working around #2013?) At the very least we should use uncancelable to ensure the Fiber doesn't leak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually surprises me that scalafmt gives this line a pass.

I had disabled formatting before the quasiquotes and forgotten to re-enable it. Fixed in 8971556

Also, why is the .start.flatMap(_.join) necessary?

2 birds, one stone :

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least we should use uncancelable to ensure the Fiber doesn't leak.

Is this correct : 0220ebf ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost! Commented on the commit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was pointing here. When #2013 is fixed, could be replaced by .forceR(F.void) I suppose

I'm not sure I understand why the forceR? Asynchronous non-terminating fibers don't take any resources and are eventually GC'd when no longer referenced, so they aren't really a problem per se.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asynchronous non-terminating fibers don't take any resources

From the point of view of someone who doesn't know the innards of Dispatcher and how it relates to the effect's runtime, protecting against potential non-terminating behaviour seemed like a good reflex 😄. Also Dispatcher being polymorphic, can you guarantee that it'll be the case for all effect types ? (I reckon it'd be pretty bad if it wasn't the case, but still).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you guarantee that it'll be the case for all effect types ? (I reckon it'd be pretty bad if it wasn't the case, but still).

I think that any F for which non-termination in Dispatcher[F] is problematic would also be an F for which fibers are problematic. I think.

Comment on lines 147 to 149
protected def completeSuccess(value: AnyRef): Unit = {
callback(Right(F.as(summary, value)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
protected def completeSuccess(value: AnyRef): Unit = {
callback(Right(F.as(summary, value)))
}
protected def completeSuccess(value: AnyRef): Unit =
callback(Right(F.as(summary, value)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djspiewak
Copy link
Member

This is ready for merge once we address the poll around the join.

@djspiewak
Copy link
Member

Looks like dead code elimination becomes very annoyed by what we're doing. Interesting…

@lrytz
Copy link

lrytz commented May 31, 2021

Looks like dead code elimination becomes very annoyed by what we're doing

Scalac's DCE? What went wrong?


case object Boom extends Throwable

def boom: Unit = throw Boom
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal warnings don't like this line. It needs a def boom(): Unit because it is a side effecting method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh 7cf2ad5

@Baccata
Copy link
Contributor Author

Baccata commented May 31, 2021

Looks like dead code elimination becomes very annoyed by what we're doing. Interesting…

Addressed in 133094b. It is interesting that it became an error only after the macro turned into a blackbox.

@vasilmkd
Copy link
Member

CI looks great.

Copy link
Member

@vasilmkd vasilmkd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go! A huge achievement.

@vasilmkd
Copy link
Member

@Baccata hats off.

@djspiewak
Copy link
Member

We should write up a page on this in the main section of the docs.

@djspiewak djspiewak merged commit 2a20bdb into typelevel:series/3.x May 31, 2021
@Baccata Baccata deleted the async-await branch June 4, 2021 08:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Experiment with -Xasync
6 participants