New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Usage with other effect systems besides Future #149
Comments
I am also very interested in this. Up +1 |
See: https://github.com/scala/async/blob/master/src/main/scala/scala/async/Async.scala#L45 For the spots you would need to replicate to add an adapter for |
would someone like to turn this into a pull request with a small addition to the doc? |
I think the major problem here would be that the However, there's https://github.com/pelotom/effectful, which seems to do the same thing for scalaz, or is it somehow different? |
@adamw effectful is effectively abandoned: pelotom/effectful#15 |
@dsilvasc True, though there's also https://github.com/monadless/monadless (not sure if it's maintained, but it usually works good enough) |
@adamw There's a question about monadless support for scala 2.13 from over 2 months ago with no response: Don't know about maintenance plans for Stateless Future: ThoughtWorks Each doesn't seem to have a scala 2.13 version published to Maven Central, but they might actively use it in production with their customers. ThoughtWorks seem to be moving on to a compiler plugin though -- same primary author from Each: Sorry for the tangent -- I think that's all of the related projects :) |
I would be more concerned about unmerged PRs than issues without comment ;) Anyway, all of these seem rather abandoned. Would be nice of course to combine all these efforts into a single one, working in any Cats/Scalaz monad (like Kotlin's coroutines), but ... there's a finite amount of time ;) |
I took a look at ScalaConcurrentAsync and AsyncId, then took a stab at implementing an adapter for a Task-like abstraction where import scala.async.internal.{AsyncBase, FutureSystem}
import scala.concurrent.Promise
import scala.reflect.macros.whitebox
import scala.language.experimental.macros
object TaskAsync extends AsyncBase {
lazy val futureSystem = TaskFutureSystem
type FS = TaskFutureSystem.type
def async[T](body: => T): Task[T] = macro asyncTaskImpl[T]
def asyncTaskImpl[T: c.WeakTypeTag](c: whitebox.Context)(body: c.Expr[T]): c.Expr[Task[T]] = {
val u: c.Expr[Unit] = c.Expr[Unit](c.parse("()"))
asyncImpl[T](c)(body)(u)
}
}
object TaskFutureSystem extends FutureSystem {
override type Prom[A] = Promise[A]
override type Fut[A] = Task[A]
override type ExecContext = Unit
override type Tryy[A] = scala.util.Try[A]
override def mkOps(c0: whitebox.Context): Ops { val c: c0.type } = new Ops {
val c: c0.type = c0
import c.universe._
def promType[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Type = weakTypeOf[Prom[A]]
def tryType[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Type = weakTypeOf[Tryy[A]]
def execContextType: Type = weakTypeOf[ExecContext]
def createProm[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Expr[Promise[A]] = reify {
Promise[A]()
}
def promiseToFuture[A: WeakTypeTag](prom: Expr[Prom[A]]): c.universe.Expr[Task[A]] = reify {
new Task(_ => prom.splice.future)
}
def future[A: WeakTypeTag](a: Expr[A])(execContext: Expr[ExecContext]): c.universe.Expr[Task[A]] = reify {
new Task(implicit ec => Future { a.splice })
}
def onComplete[A, B](
task: Expr[Fut[A]],
fun: Expr[scala.util.Try[A] => B],
execContext: Expr[ExecContext]): Expr[Unit] = reify {
task.splice.onComplete(fun.splice)
}
override def continueCompletedFutureOnSameThread: Boolean = true
override def getCompleted[A: WeakTypeTag](task: Expr[Fut[A]]): Expr[Tryy[A]] = reify {
if (task.splice.isCompleted) future.splice.value.get else null
}
def completeProm[A](prom: Expr[Prom[A]], value: Expr[scala.util.Try[A]]): Expr[Unit] = reify {
prom.splice.complete(value.splice)
c.Expr[Unit](Literal(Constant(()))).splice
}
def tryyIsFailure[A](tryy: Expr[scala.util.Try[A]]): Expr[Boolean] = reify {
tryy.splice.isFailure
}
def tryyGet[A](tryy: Expr[Tryy[A]]): Expr[A] = reify {
tryy.splice.get
}
def tryySuccess[A: WeakTypeTag](a: Expr[A]): Expr[Tryy[A]] = reify {
scala.util.Success[A](a.splice)
}
def tryyFailure[A: WeakTypeTag](a: Expr[Throwable]): Expr[Tryy[A]] = reify {
scala.util.Failure[A](a.splice)
}
}
} This doesn't work because the async macro expects the implementation of the To clarify, here's an example: val runCount = new AtomicInteger(0)
val t = async {
val task = async {
runCount.incrementAndGet()
5
}
assert(runCount.get == 0)
val ten = await(task) * 2
assert(ten == 10)
assert(runCount.get == 1)
ten
}
val future = t.run()(ExecutionContext.global)
val result = Await.result(future, Duration.Inf)
assert(runCount.get == 1)
assert(result == 10) and the macro expansion printed by val runCount = new AtomicInteger(0)
class StateMachine extends AnyRef with (snapchat.concurrent.TaskFutureSystem.Tryy[Any] => Unit) with (() => Unit) {
private[this] var await$macro$4$macro$6: Int = _
private[this] var state: Int = 0
/*private[this] */
val result: snapchat.concurrent.TaskFutureSystem.Prom[Int] = Promise.apply[Int]()
// def result: snapchat.concurrent.TaskFutureSystem.Prom[Int] = result
private[this] val execContext: Unit = ()
// def execContext: Unit = execContext
def apply(tr: snapchat.concurrent.TaskFutureSystem.Tryy[Int]): Unit = while$macro$8() {
try {
state match {
case 0 =>
val task = new Task(implicit ec => Future {
runCount.incrementAndGet()
5
})
assert(runCount.get().==(0))
val awaitable$macro$3
: snapchat.concurrent.Task[Int] /* @scala.reflect.internal.annotations.uncheckedBounds */ =
task
state = 1
val completed$macro$7: snapchat.concurrent.TaskFutureSystem.Tryy[Int] =
if (awaitable$macro$3.isCompleted)
awaitable$macro$3.value.get
else
null
if (null.ne(completed$macro$7)) {
if (completed$macro$7.isFailure) {
/*stateMachine$macro$2.this.*/
result.complete(completed$macro$7)
return ()
} else {
await$macro$4$macro$6 = completed$macro$7.get
state = 2
}
()
} else {
awaitable$macro$3.onComplete(this)
return ()
}
()
case 2 => {
/*stateMachine$macro$2.this.*/
result.complete(Success({
val x$macro$5: Int = 2
val ten: Int = await$macro$4$macro$6.*(x$macro$5)
assert(ten.==(10))
assert(runCount.get().==(0))
ten
}))
()
}
return ()
case 1 =>
if (tr.isFailure) {
/*stateMachine$macro$2.this.*/
result
.complete(tr)
return ()
} else {
await$macro$4$macro$6 = tr.get
state = 2
}
()
case _ => throw new IllegalStateException()
}
} catch {
case throwable @ (_: Throwable) =>
result.complete(Failure[Int](throwable))
return ()
}
while$macro$8()
}
def apply(): Unit = StateMachine.this.apply(null)
}
val stateMachine = new StateMachine
new Task(implicit ec => Future { stateMachine.apply() })
new Task(_ => stateMachine$macro$2.result.future) Note the last two lines. The call to |
@dsilva I ended up using https://github.com/monadless/monadless, which has the same functionality, but is general and works for any monad. Unfortunately, it's also unmaintained. |
@adamw Thanks, that worked for me too. Looks like it rewrites code into calls to map and flatMap instead of state machines. |
For a complete example of extending async/await, we have a version using Twitter @SethTisue I could take a stab at the documentation update |
I'm open to refactoring the internals of async to support abstractions like |
Dsl.scala is not abandoned. |
Dsl.scala is not only more general than
|
The integration story for other awaitable types should get easier with scala/scala#8816. The test case in that PR includes this integration for object CompletableFutureAwait {
def async[T](executor: Executor)(body: T): CompletableFuture[T] = macro impl
@compileTimeOnly("[async] `await` must be enclosed in `async`")
def await[T](completableFuture: CompletableFuture[T]): T = ???
def impl(c: blackbox.Context)(executor: c.Tree)(body: c.Tree): c.Tree = {
import c.universe._
val awaitSym = typeOf[CompletableFutureAwait.type].decl(TermName("await"))
def mark(t: DefDef): Tree = c.internal.markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
val name = TypeName("stateMachine$$async_" + body.pos.line)
q"""
final class $name extends _root_.scala.tools.nsc.async.CompletableFutureStateMachine($executor) {
${mark(q"""override def apply(tr$$async: _root_.scala.util.Try[_root_.scala.AnyRef]) = ${body}""")}
}
new $name().start().asInstanceOf[${c.macroApplication.tpe}]
"""
}
}
abstract class CompletableFutureStateMachine(executor: Executor) extends AsyncStateMachine[CompletableFuture[AnyRef], Try[AnyRef]] with Runnable with BiConsumer[AnyRef, Throwable] {
Objects.requireNonNull(executor)
protected var result$async: CompletableFuture[AnyRef] = new CompletableFuture[AnyRef]();
// Adapters
def accept(value: AnyRef, throwable: Throwable): Unit = {
this(if (throwable != null) Failure(throwable) else Success(value))
}
def run(): Unit = {
apply(null)
}
// FSM translated method
def apply(tr$async: Try[AnyRef]): Unit
// Required methods
protected var state$async: Int = StateAssigner.Initial
protected def completeFailure(t: Throwable): Unit = result$async.completeExceptionally(t)
protected def completeSuccess(value: AnyRef): Unit = result$async.complete(value)
protected def onComplete(f: CompletableFuture[AnyRef]): Unit = f.whenCompleteAsync(this)
protected def getCompleted(f: CompletableFuture[AnyRef]): Try[AnyRef] = try {
val r = f.getNow(this)
if (r == this) null
else Success(r)
} catch {
case t: Throwable => Failure(t)
}
protected def tryGet(tr: Try[AnyRef]): AnyRef = tr match {
case Success(value) =>
value.asInstanceOf[AnyRef]
case Failure(throwable) =>
result$async.completeExceptionally(throwable)
this // sentinel value to indicate the dispatch loop should exit.
}
def start(): CompletableFuture[AnyRef] = {
executor.execute(this)
result$async
}
} |
@retronym With that change, do you have in mind what an example integration without an executor would look like? It would be excellent if moving scala-async into the compiler can be done in away that makes it possible to integrate with task types like monix Task, scalaz Task, ZIO, and cats IO. |
Here's one way to integrate a cats-eval like type. package scala.tools.nsc
package async
import scala.language.experimental.macros
import scala.reflect.macros.blackbox
import scala.annotation.compileTimeOnly
import scala.tools.partest.async.AsyncStateMachine
object EvalAwait {
def evaluating[T](body: T): Eval[T] = macro impl
@compileTimeOnly("[async] `value` must be enclosed in `writing`")
def value[T](output: Eval[T]): T = ???
def impl(c: blackbox.Context)(body: c.Tree): c.Tree = {
import c.universe._
val awaitSym = typeOf[EvalAwait.type].decl(TermName("value"))
def mark(t: DefDef): Tree = c.internal.markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
val name = TypeName("stateMachine$$async_" + body.pos.line)
q"""
final class $name extends _root_.scala.tools.nsc.async.EvalStateMachine {
${mark(q"""override def apply(tr$$async: _root_.scala.AnyRef) = ${body}""")}
}
new $name().start().asInstanceOf[${c.macroApplication.tpe}]
"""
}
}
abstract class Eval[A] {
def value: A
}
object Eval {
def now[T](t: T): Eval[T] = Now(t)
def later[T](t: => T): Eval[T] = new Later(() => t)
def always[T](t: => T): Eval[T] = new Always(() => t)
}
final case class Now[A](value: A) extends Eval[A] {
def memoize: Eval[A] = this
}
final class Later[A](f: () => A) extends Eval[A] {
private[this] var thunk: () => A = f
lazy val value: A = {
try thunk() finally thunk = null
}
}
final class Always[A](f: () => A) extends Eval[A] {
def value: A = f()
def memoize: Eval[A] = new Later(f)
}
abstract class EvalStateMachine extends AsyncStateMachine[Eval[AnyRef], AnyRef] {
var result$async: AnyRef = _
// FSM translated method
def apply(tr$async: AnyRef): Unit
// Required methods
protected var state$async: Int = 0
protected def completeFailure(t: Throwable): Unit = throw t
protected def completeSuccess(value: AnyRef): Unit = result$async = value
protected def onComplete(f: Eval[AnyRef]): Unit = throw new UnsupportedOperationException()
protected def getCompleted(f: Eval[AnyRef]): AnyRef = f.value
protected def tryGet(tr: AnyRef): AnyRef = tr
def start(): Eval[AnyRef] = {
apply(null)
Eval.now(result$async)
}
} The following runs in a stack safe manner. It wouldn't integrate with the trampoline style evaluation in cat-eval flatMap etc, but all the composition of values in the import scala.tools.nsc.async._
import EvalAwait._
object Test {
def v1 = Eval.now("v1")
def v2 = Eval.now("v2")
def v3 = Eval.later("later")
def test: String = test0.value
def test0 = evaluating[String] {
var r1 = ""
var i = 0
while (i < 10000) {
r1 = value(v1) + value(v2) + value(v3)
i += 1
}
r1
}
} |
For completeness, I'll also do a prototype integration with |
Here's a possible monix-task integration: retronym/monad-ui@3b146ec A new instance of the state machine is created for each execution of the As an external library, without using the built in run loop in Monix, there doesn't seem to be an API for exploiting the fact that Review by @alexandru, perhaps? |
Oh wow, looking good. Loved |
Has a cats integration been considered? It wouldn't require a lot of knowledge from the plugin about the constructors of the effect, as long as only the interface of |
I definitely think that doing this more generally for cats/cats-effect types would make a lot of sense. Most of the machinery it looks like would only require |
The async/await DSL has two selling points:
The difference between single-shot vs multi-shot effects is bit subtle. I thought we'd have trouble modelling Take a look at https://github.com/retronym/monad-ui/ for some experiments I did with different DSLs atop the writer Monad. So, does an async-like DSL still make sense for effects that are really multi-shot (like
Scalac's |
I don't think this DSL for List's looks appealing to use. It's best to focus on Here's a first cut at integration. Caveat: I haven't used I only needed |
As a comparison, here's all the code necessary to support cats with Monadless: https://github.com/monadless/monadless/blob/master/monadless-cats/src/main/scala/io/monadless/cats/MonadlessApplicative.scala It can depend only on |
The README indicates that this library can be used with other implementations of the future pattern - however I couldn't find documentation on how to do this.
In particular I'm interested in using with scalaz Task. Is this possible?
The text was updated successfully, but these errors were encountered: