New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polymorphic async/await implementation #1924
Conversation
This preliminary work adds an async/await implementation based off the now built-in mechanism in the Scala 2 compiler. The grittiest details of the implementation are borrowed from : * scala/scala#8816 * https://github.com/retronym/monad-ui/tree/master/src/main/scala/monadui * https://github.com/scala/scala-async Due to the reliance on Dispatcher#unsafeRunSync, the implementation currently only works on JVM. Error propagation / cancellation seems to behave as it should. NB : it is worth noting that despite it compiling, using this with OptionT/EitherT/IorT is currently unsafe, for two reasons: * what seems to be a bug in the MonadCancel instance tied to those Error-able types: See https://gitter.im/typelevel/cats-effect-dev?at=60818cf4ae90f3684098c042 * The fact that calling `unsafeRunSync` is `F[A] => A`, which obviously doesn't work for types that have an error channel that isn't accounted for by the CE typeclasses.
|
||
"IOAsyncAwait" should { | ||
object IOAsyncAwait extends cats.effect.std.AsyncAwaitDsl[IO] | ||
import IOAsyncAwait.{await => ioAwait, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await
collides with something in FutureMatchers.
I haven't added a syntax object for IO in the main scope yet. If someone could make an executive decision over where to put things and what to call them, that'd be great. The current UX (as you can see in tests) looks like this: object IOAsyncAwait extends cats.effect.std.AsyncAwaitDsl[IO]
import IOAsyncAwait._
async(await(IO.println("hello"))) |
Since we're all very new to this, may I suggest that we mark this as experimental and iterate on this for a couple of versions without promising source or binary compatibility. Who knows, we may end up improving on some of the constraints. Hats off to @Baccata for the effort. I'm interested to look into this in more depth, I've never dabbled with macros before. |
I agree with this. It's worth noting though that we basically already have a built-in "experimental" flag on all of this, since none of it is accessible without using the |
This aims at solving the problem of OptionT/EitherT/IorT traversing dispatchers
* Leveraged DispatchCloak in the async/await implementation * Added tests to show that it works for OptionT
@vasilmkd, @djspiewak, my latest commits on this introduce a construct (that I named
|
Just to confirm, you're still currently hitting the problems with transformers that we became aware of a couple days ago right? I just want to make sure this is not somehow hiding those or relying on behavior that will be changed/fixed in the next version. Btw, I'm not sure about the scala-2 source directory, you're going to have to fiddle around with it unfortunately. 😕 |
tests/jvm/src/test/scala-2/cats/effect/std/AsyncAwaitSpec.scala
Outdated
Show resolved
Hide resolved
Let's put it this way : the "DispatchCloak" construct is solving two problems by applying a solution which coincidentally prevents the problematic codepath (the one we became aware of) from being exercised. So even when the MonadCancel#bracketFull implementation is fixed for monad transformers, this solution (or something similar) would still have to be applied, in order to :
In other words, the fixing of MonadCancel#bracketFull would not have been enough to prevent async/await from hanging when it encounters |
I'm very very happy with this outcome. |
Wait, actually I'm very dumb 🤦 see here. Yeah it'll need to be rewritten in terms of Deferred + guarantee ... |
We'll fix the issue, that's definitely a priority for the next release. |
Spent some time thinking about this today. Do you think it would be better to just define this for I'm interested in @retronym's perspective on this. It seems that the current |
Considering it works for Kleisli / WriterT, I think it'd be silly not to provide the feature for a combination of those types. Or to prevent other IO implementations from tapping into this mechanism.
TBH, the state machine makes few assumptions. The problem can be summarised as this : "how do you generically decide whether a computation has been successful, according to all its monadic effects ?" Outside of the import cats.effect.IOApp
import cats.effect.IO
import cats.data.OptionT
import cats.effect.kernel.Sync
import cats.effect.std.Dispatcher
object Main extends IOApp.Simple {
type Foo[A] = OptionT[IO, A]
def run = {
IO.println("Hello") >>
Dispatcher[Foo].use { dispatcher =>
Sync[Foo].delay(dispatcher.unsafeRunSync(OptionT.none[IO, Int]))
}.value >> IO.println("Goodbye")
}
}
In the case of Async/Await, a more elegant encoding could be applied: The call to CE's The value of encoding things in terms of Throwable (even if the current DispatchCloak construct could do with polishing) is that it offers a start of solution for the very present problem showcased in the snippet above. Throwables are ubiquitous in all runtimes Scala compiles against, and a lot of libraries ,that people are gonna use Dispatcher to interop, with will likely propagate throwables. To summarise, the possible solutions, as I see them are :
|
That was a stretch, my bad. It does work for Kleisli, but in the case of WriterT, the logs get lost through in the async/await loop through each callback. I'm gonna try out something to address it (in a less ad-hoc fashion than the DispatchCloak thing). |
This allows Async/Await to work with any monadic effect.
@djspiewak, I've nuked the "DispatchCloak" in favour of a much less ad-hoc "Resume" construct that allows for decomposing a monadic computation into its potential product and coproduct components. Using a mutable variable in the async/await state machine (which I presume is thread-safe since the dispatch loop requires the FSM to hold a mutable integer, but it'd be good to have @retronym's confirmation), the intermediate monadic results can be stored when the async/await loop retrieves the value to fulfil the current "await", and re-composed with the effect indexed by the next "await" call. The CE |
protected def tryGet(tr: AsyncAwaitDsl.ResumeOutcome[F]): AnyRef = | ||
tr match { | ||
case Right((newEffect, value)) => | ||
recordedEffect = newEffect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to inline some comments there.
resume.resume((recordedEffect *> f)).start.flatMap(_.join).flatMap { | ||
case Canceled() => F.delay(this(Left(F.canceled.asInstanceOf[F[AnyRef]]))) | ||
case Errored(e) => F.delay(this(Left(F.raiseError(e)))) | ||
case Succeeded(resumed) => resumed.flatMap(r => F.delay(this(r))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I spent a ton of time thinking about this, and it occurs to me that we should be able to get this same kind of "peak into the control layer" effect by using guaranteeCase
. In particular, the Succeeded
case should contain the F[A]
even when that F[A]
is some coproduct case. Unfortunately, this is precisely what isn't working right now because of the bracketCase
issue. I think if I fix that issue, then we don't need Resume
here anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if I fix that issue, then we don't need Resume here anymore.
I don't think that'll suffice, unfortunately, though I'd be really happy to be proven wrong. One thing that the structural interface (which XAsync
works with) requires is the implementation of the tryGet
method, which, if you squint really hard, is akin to F[A] => Either[Stop, A]
(the Stop is signalled by returning this
... ).
For any await block, the state machine will call this tryGet
method to unpack the value from whatever structure you elect to work with, and substitutes the await block with that value. So the structure you choose needs to contain enough information to decide whether to signal to the state machine whether it should stop, or continue.
So as far as I'm aware, XAsync
cannot be implemented in a polymorphic manner without some abstraction letting you decide whether your F[A]
actually "contains" the A
. Fixing guaranteeCase
will not give us that, it will however mean that resume
could be called later than it currently is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not across the particular context of this PR but wanted to mention that the structural interface of async does support using an identity type alias as the "Either" type.
This makes sense if the Future-ish data type + dispatcher are responsible for propagating failures to dependant tasks. That isn't the case with Scala Futures, but is true in some other task abstractions, e.g.:
trait NodeStateMachine extends AsyncStateMachine[Node[AnyRef], AnyRef] with (AnyRef => Unit) {
def node: Node[AnyRef]
private[this] var state$async: Int = 0
protected def state: Int = state$async
protected def state_=(s: Int): Unit = state$async = s
final def apply(tr$async: R[AnyRef]): Unit = fsm(tr$async)
def fsm(tr$async: R[AnyRef]): Unit
type F[A] = Node[A]
type R[A] = A
// Adapter methods
protected def completeFailure(t: Throwable): Unit = throw new UnsupportedOperationException()
protected def completeSuccess(value: AnyRef): Unit = node.completeWithValue(value)
protected def onComplete(f: F[AnyRef]): Unit = f.onComplete(this)
protected def getCompleted(f: F[AnyRef]): R[AnyRef] = f.getCompleted
protected def tryGet(tr: R[AnyRef]): tr.type = tr
}
sealed class Node[T] {
private[this] val ec: Executor = Node.DefaultThreadPool
private[this] final val result: CompletableFuture[T @uncheckedVariance] = new CompletableFuture()
def address: NodeAddress[T] = NodeAddress.NoAddress
protected def runAsync(f: () => Unit): Unit = {
val childContext = NodeContext.childContext(this)
CompletableFuture.runAsync ({ () =>
NodeContext.pushContext(childContext)
try f()
catch {
case t: Throwable =>
completeWithFailure(t)
} finally NodeContext.popContext()
}, ec)
}
def onComplete[U](f: T => U): Unit = {
val childContext = NodeContext.childContext(this)
result.whenCompleteAsync((t: T, throwable: Throwable) => {
NodeContext.pushContext(childContext)
try {
if (throwable != null) {
completeWithFailure(throwable)
} else {
f(t)
}
} catch {
case t: Throwable =>
completeWithFailure(t)
} finally NodeContext.popContext()
}, ec)
}
def completeWithValue(t: T): Unit = result.complete(t)
def completeWithFailure(t: Throwable): Unit = {
result.completeExceptionally(t)
var context = NodeContext.current()
while (context ne RootContext) {
context.node.completeWithFailureDirect(t)
context = context.parent
}
}
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing guaranteeCase will not give us that, it will however mean that resume could be called later than it currently is.
It might if we cheat. So some things we can hang our hat on:
guaranteeCase
will run regardless of whether or not we "contain" theA
. For example, anOptionT.empty
will result in aSucceeded
withNone
. This is what thebracketCase
fix buys us- We can peak into the result that's inside the
Succeeded
byflatMap
andDispatcher
- The result, in practice, is going to be determined by
F.pure
for someF
, meaning that we can safelyunsafeRunSync
on it. 😈 - This means that we can figure out whether or not there's an
A
by simplyflatMap
ping with something that captures thatA
and re-pure
s it intoF
in a localvar
- If the
var
isn't populated by theunsafeRunSync
, then we know we're in a coproduct case, so we populate a differentvar
with… exactly theF[A]
we just ran. Then short-circuit the whole rest of the expression and return it, casted to the appropriate type
I'm pretty sure this works in all cases that the control layer solution (Resume
) would have worked in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
F.pure for some F, meaning that we can safely unsafeRunSync on it
That is not quite accurate... Funnily enough, that's what I was doing in the first iteration.
If the var isn't populated by the unsafeRunSync, then we know we're in a coproduct case
What you seem to forget is that the behaviour of running unsafeRunSync when you're in the coproduct case is non-termination ... what you suggest would be possible if there was a flavour of unsafeRun
that returned Either[F[A], A]
or something, which is partially what I've tried to achieve with Resume
.
Here's a repro of such non-termination.
import cats.effect.IOApp
import cats.effect.IO
import cats.effect.std._
import cats.data.OptionT
import cats.effect.kernel.Sync
object Main extends IOApp.Simple {
type Foo[A] = OptionT[IO, A]
// this could be returned in the success case of `guaranteeCase`
val prog: Foo[Int] = OptionT[IO, Int](IO.pure(None))
def run: IO[Unit] = {
val dispatched =
Dispatcher[Foo].use(d => Sync[Foo].delay(d.unsafeRunSync(prog))).value
dispatched >> IO.println("completed")
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might work :) . I'll try it out when guaranteeCase is fixed for transformers. Is there an open issue for that btw ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there isn't an open issue yet. Would you mind opening one if you have the example immediately at hand? I'm drowning in a couple branches right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@djspiewak the mutation trick works 👍 , and the guaranteeCase fix isn't blocking this (I think forking a fiber before dispatch is still desirable to avoid the non-termination problem, though in the case of unsafeRunAndForget I do not know whether a thread would end up being monopolised in the background).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fibers are forked regardless when you do an unsafeRun
of any sort on Dispatcher
, so the secondary forking is unneeded. I do see how it's working around the bracket issue though.
Allows to decide whether we're in a happy path or not, without knowing anything about the underlying monad. Knowning whether we're still in the happy path is crucial to signal to the Async/Await state machine whether it should continue or have the async block yield.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Giving a more proper review now that we're in the home stretch! I'm really excited about this, tbh.
type CallbackTarget[F[_]] = F[AnyRef] | ||
type Callback[F[_]] = Either[Throwable, CallbackTarget[F]] => Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, because it's used in macro expansion, so the client code needs to have visibility over those types.
These aliases have been really valuable during iteration as otherwise, any change to the types lead to having to amend the quasiquote expression in several places which is really, really annoying due to not getting compile errors until the macro expansion. I'd rather keep them for maintainability reasons but will obviously abide if you make an executive call to remove them.
In the meantime, I removed CallbackTarget
as it's useless in this iteration and renamed Callback
to AwaitCallback
(8971556)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm cool with keeping them. Just wanted to sanity check.
import cats.effect.kernel.Outcome.Errored | ||
import cats.effect.kernel.Outcome.Succeeded | ||
|
||
class AsyncAwaitDsl[F[_]](implicit F: Async[F]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some scaladoc being like "uh, this is really really unstable, only works on Scala 2.x, and will likely change and/or be deprecated in the future"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love it.
@@ -0,0 +1,189 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor thing, but can we rename this file in lower-case to reflect the fact that it contains multiple top-level members? Like asyncAwait.scala
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object IOAsyncAwait extends cats.effect.std.AsyncAwaitDsl[IO] | ||
import IOAsyncAwait.{await => ioAwait, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we're whitebox macroing here, I wonder if it's possible to have an API which doesn't require this kind of "dsl partial application" pattern. I'm totally fine going with this for now, especially since we aren't guaranteeing compatibility, but it feels possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turned into a blackbox macro in 259b723.
What you suggest might be possible but it would require a lot more metaprogramming skills than what I can offer. Like, you'd have to collect all the await blocks (which would then be typed def await[F[_], A](fa: F[A]) : A
which would likely mess with inference), extract the F from each of them, ensure they're all consistent, implicitly resolve the Async typeclass instance using macro magic ...
Looking at this library, which provides the same feature for Scala 3, the UX is bound to improve in the future, but in the current state, I think "dsl partial application" is the least bad alternative. People who want to use async/await are likely gonna be coding against a specific effect (as opposed to an abstract one) and will be able to define a global instance of the dsl for their effect, after which type inference is solid.
In the meantime, should I add partially applied instance for the specific case of IO ? If so, can you suggest a location / name ?
|
||
val program = for { | ||
ref <- Ref[IO].of(0) | ||
_ <- async { ioAwait(IO.sleep(100.millis) *> ref.update(_ + 1)) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we got a little fancier?
_ <- async { ioAwait(IO.sleep(100.millis) *> ref.update(_ + 1)) } | |
_ <- async { | |
ioAwait(IO.never) | |
ioAwait(ref.update(_ + 1)) | |
} |
This should also make the test more deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ <- program | ||
after <- IO(x must beEqualTo(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also worth testing this sequenced a second time, ensuring that x must beEqualTo(2)
afterward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before <- IO(x must beEqualTo(0)) | ||
_ <- program | ||
after <- IO(x must beEqualTo(1)) | ||
} yield before && after |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} yield before && after | |
} yield ok |
You already did the assertion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this magic ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this magic ?
LOL. The must
syntax throws, so you don't really need the return results.
// as inspecting the Succeeded outcome using dispatcher is risky on algebraic sums, | ||
// such as OptionT, EitherT, ... | ||
var awaitedValue: Option[AnyRef] = None | ||
(summary *> f).flatTap(r => F.delay{awaitedValue = Some(r)}).start.flatMap(_.join).flatMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually surprises me that scalafmt gives this line a pass.
(summary *> f).flatTap(r => F.delay{awaitedValue = Some(r)}).start.flatMap(_.join).flatMap { | |
(summary *> f).flatTap(r => F.delay { awaitedValue = Some(r) }).start.flatMap(_.join).flatMap { |
Also, why is the .start.flatMap(_.join)
necessary? Why not just guaranteeCase
? (is this just working around #2013?) At the very least we should use uncancelable
to ensure the Fiber
doesn't leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually surprises me that scalafmt gives this line a pass.
I had disabled formatting before the quasiquotes and forgotten to re-enable it. Fixed in 8971556
Also, why is the .start.flatMap(_.join) necessary?
2 birds, one stone :
- working around Override MonadCancel#bracketFull in monad transformers instances #2013 as you guessed
- prevent cases like OptionT.none to be run through the Dispatcher, which makes me nervous (even when calling unsafeRunAndForget). This is what I was pointing here. When Override MonadCancel#bracketFull in monad transformers instances #2013 is fixed, could be replaced by
.forceR(F.void)
I suppose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least we should use uncancelable to ensure the Fiber doesn't leak.
Is this correct : 0220ebf ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost! Commented on the commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I was pointing here. When #2013 is fixed, could be replaced by .forceR(F.void) I suppose
I'm not sure I understand why the forceR
? Asynchronous non-terminating fibers don't take any resources and are eventually GC'd when no longer referenced, so they aren't really a problem per se.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asynchronous non-terminating fibers don't take any resources
From the point of view of someone who doesn't know the innards of Dispatcher and how it relates to the effect's runtime, protecting against potential non-terminating behaviour seemed like a good reflex 😄. Also Dispatcher being polymorphic, can you guarantee that it'll be the case for all effect types ? (I reckon it'd be pretty bad if it wasn't the case, but still).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you guarantee that it'll be the case for all effect types ? (I reckon it'd be pretty bad if it wasn't the case, but still).
I think that any F
for which non-termination in Dispatcher[F]
is problematic would also be an F
for which fibers are problematic. I think.
protected def completeSuccess(value: AnyRef): Unit = { | ||
callback(Right(F.as(summary, value))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected def completeSuccess(value: AnyRef): Unit = { | |
callback(Right(F.as(summary, value))) | |
} | |
protected def completeSuccess(value: AnyRef): Unit = | |
callback(Right(F.as(summary, value))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ready for merge once we address the |
Looks like dead code elimination becomes very annoyed by what we're doing. Interesting… |
Scalac's DCE? What went wrong? |
|
||
case object Boom extends Throwable | ||
|
||
def boom: Unit = throw Boom |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fatal warnings don't like this line. It needs a def boom(): Unit
because it is a side effecting method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sigh 7cf2ad5
Addressed in 133094b. It is interesting that it became an error only after the macro turned into a blackbox. |
CI looks great. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go! A huge achievement.
@Baccata hats off. |
We should write up a page on this in the main section of the docs. |
This preliminary work adds an async/await implementation based off the now built-in mechanism in the Scala 2 compiler.
The grittiest details of the implementation are borrowed from :
Due to the reliance on Dispatcher#unsafeRunSync, the implementation currently only works on JVM.
Error propagation / cancellation seems to behave as it should.
NB : despite it compiling, using this with OptionT/EitherT/IorT is currently unsafe, for two reasons:what seems to be a bug in the MonadCancel instance tied to those Error-able types: See https://gitter.im/typelevel/cats-effect-dev?at=60818cf4ae90f3684098c042The fact that callingunsafeRunSync
isF[A] => A
, which obviously doesn't work for types that have an error channel that isn't accounted for by the CE typeclasses.I've introduced a construct to deal with side-error-channels in what I currently believe is a safe manner.
Fixes #1907