Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource computation block #2571

Merged
merged 13 commits into from Nov 11, 2021
10 changes: 10 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Expand Up @@ -354,7 +354,9 @@ public final class arrow/fx/coroutines/ResourceExtensionsKt {
}

public final class arrow/fx/coroutines/ResourceKt {
public static final fun release (Larrow/fx/coroutines/Resource;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource;
public static final fun release-zgiIeyo (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource;
public static final fun releaseCase (Larrow/fx/coroutines/Resource;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Resource;
public static final fun releaseCase-zgiIeyo (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Resource;
public static final fun resource (Lkotlin/jvm/functions/Function1;)Lkotlin/jvm/functions/Function1;
public static final fun sequence (Ljava/lang/Iterable;)Larrow/fx/coroutines/Resource;
Expand Down Expand Up @@ -488,3 +490,11 @@ public final class arrow/fx/coroutines/Use {
public final synthetic fun unbox-impl ()Lkotlin/jvm/functions/Function1;
}

public abstract interface class arrow/fx/coroutines/computations/ResourceEffect {
public abstract fun bind (Larrow/fx/coroutines/Resource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/computations/ResourceKt {
public static final fun resource (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource;
}

@@ -1,15 +1,11 @@
package arrow.fx.coroutines

import arrow.core.identity
import arrow.core.nonFatalOrThrow
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Deferred
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext


nomisRev marked this conversation as resolved.
Show resolved Hide resolved

/**
* [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other
Expand Down Expand Up @@ -442,42 +438,14 @@ public sealed class Resource<out A> {
* }
* //sampleEnd
* ```
*
*
*/
public fun <B, C> parZip(
ctx: CoroutineContext = Dispatchers.Default,
fb: Resource<B>,
f: suspend (A, B) -> C
): Resource<C> =
Resource({
supervisorScope {
val faa = async(ctx) { allocate() }
val fbb = async(ctx) { fb.allocate() }
val a = awaitOrCancelOther(faa, fbb)
val b = awaitOrCancelOther(fbb, faa)
Pair(a, b)
}
}, { (ar, br), ex ->
val (_, releaseA) = ar
val (_, releaseB) = br
supervisorScope {
val faa = async(ctx) { releaseA(ex) }
val fbb = async(ctx) { releaseB(ex) }
try {
faa.await()
} catch (errorA: Throwable) {
try {
fbb.await()
} catch (errorB: Throwable) {
throw Platform.composeErrors(errorA, errorB)
}
throw errorA
}
fbb.await()
}
}).map { (ar, br) ->
f(ar.first, br.first)
arrow.fx.coroutines.computations.resource {
parZip(ctx, { this@Resource.bind() }, { fb.bind() }) { a, b -> f(a, b) }
}

public class Bind<A, B>(public val source: Resource<A>, public val f: (A) -> Resource<B>) : Resource<B>()
Expand Down Expand Up @@ -589,25 +557,39 @@ public sealed class Resource<out A> {
* }
* ```
*/
@Deprecated("Use the resource computation DSL instead")
public inline class Use<A>(internal val acquire: suspend () -> A)

/**
* Marks an [acquire] operation as the [Resource.use] step of a [Resource].
*/
@Deprecated("Use the resource computation DSL instead", ReplaceWith("resource(acquire)", "arrow.fx.coroutines.computation.resource"))
public fun <A> resource(acquire: suspend () -> A): Use<A> = Use(acquire)

/**
* Composes a [release] action to a [Resource.use] action creating a [Resource].
*/
@Deprecated("Use the resource computation DSL instead")
public infix fun <A> Use<A>.release(release: suspend (A) -> Unit): Resource<A> =
Resource(acquire) { a, _ -> release(a) }

/**
* Composes a [releaseCase] action to a [Resource.use] action creating a [Resource].
* Composes a [release] action to a [Resource.use] action creating a [Resource].
*/
public infix fun <A> Resource<A>.release(release: suspend (A) -> Unit): Resource<A> =
flatMap { a ->
Resource({ a }, { _, _ -> release(a) })
}

@Deprecated("Use the resource computation DSL instead")
public infix fun <A> Use<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource<A> =
Resource(acquire, release)

/**
* Composes a [releaseCase] action to a [Resource.use] action creating a [Resource].
*/
public infix fun <A> Resource<A>.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource<A> =
flatMap { a ->
Resource({ a }, { _, ex -> release(a, ex) })
}

/**
* Traverse this [Iterable] and collects the resulting `Resource<B>` of [f] into a `Resource<List<B>>`.
*
Expand Down Expand Up @@ -689,78 +671,3 @@ public inline fun <A, B> Iterable<A>.traverseResource(crossinline f: (A) -> Reso
@Suppress("NOTHING_TO_INLINE")
public inline fun <A> Iterable<Resource<A>>.sequence(): Resource<List<A>> =
traverseResource(::identity)

// Interpreter that knows how to evaluate a Resource data structure
// Maintains its own stack for dealing with Bind chains
@Suppress("UNCHECKED_CAST")
private tailrec suspend fun useLoop(
current: Resource<Any?>,
stack: List<(Any?) -> Resource<Any?>>
): Pair<Any?, suspend (ExitCase) -> Unit> =
when (current) {
is Resource.Defer -> useLoop(current.resource.invoke(), stack)
is Resource.Bind<*, *> ->
useLoop(current.source, listOf(current.f as (Any?) -> Resource<Any?>) + stack)
is Resource.Allocate -> loadResourceAndReleaseHandler(
acquire = current.acquire,
use = { a ->
when {
stack.isEmpty() -> Pair(a) { ex -> current.release(a, ex) }
else -> useLoop(stack.first()(a), stack.drop(1))
}
},
release = { _, _ -> /*a, exitCase -> current.release(a, exitCase)*/ }
)
}

private suspend fun <A> Resource<A>.allocate(): Pair<A, suspend (ExitCase) -> Unit> =
useLoop(this, emptyList()) as Pair<A, suspend (ExitCase) -> Unit>

private suspend inline fun loadResourceAndReleaseHandler(
crossinline acquire: suspend () -> Any?,
crossinline use: suspend (Any?) -> Pair<Any?, suspend (ExitCase) -> Unit>,
crossinline release: suspend (Any?, ExitCase) -> Unit
): Pair<Any?, suspend (ExitCase) -> Unit> {
val acquired = withContext(NonCancellable) {
acquire()
}

return try { // Successfully loaded resource, pass it and its release f down
val (b, _release) = use(acquired)
Pair(b) { ex -> _release(ex); release(acquired, ex) }
} catch (e: CancellationException) { // Release when cancelled
runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) }
} catch (t: Throwable) { // Release when failed to load resource
runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) }
}
}

private suspend fun <A, B> awaitOrCancelOther(
fa: Deferred<Pair<A, suspend (ExitCase) -> Unit>>,
fb: Deferred<Pair<B, suspend (ExitCase) -> Unit>>
): Pair<A, suspend (ExitCase) -> Unit> =
try {
fa.await()
} catch (e: Throwable) {
if (e is CancellationException) awaitAndAddSuppressed(fb, e, ExitCase.Cancelled(e))
else awaitAndAddSuppressed(fb, e, ExitCase.Failure(e))
}

private suspend fun awaitAndAddSuppressed(
fb: Deferred<Pair<*, suspend (ExitCase) -> Unit>>,
e: Throwable,
exitCase: ExitCase
): Nothing {
val cancellationException = try {
if (fb.isCancelled && fb.isCompleted) fb.getCompletionExceptionOrNull()
else fb.await().second.invoke(exitCase).let { null }
} catch (e2: Throwable) {
throw e.apply { addSuppressed(e2) }
}

val exception = cancellationException?.let {
e.apply { addSuppressed(it) }
} ?: e

throw exception
}
@@ -0,0 +1,128 @@
package arrow.fx.coroutines.computations

import arrow.continuations.generic.AtomicRef
import arrow.continuations.generic.update
import arrow.core.NonEmptyList
import arrow.core.ValidatedNel
import arrow.core.invalidNel
import arrow.core.nonFatalOrThrow
import arrow.core.traverseValidated
import arrow.core.valid
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.Platform
import arrow.fx.coroutines.Resource
import arrow.fx.coroutines.runReleaseAndRethrow
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext

public interface ResourceEffect {
public suspend fun <A> Resource<A>.bind(): A
}

public fun <A> resource(f: suspend ResourceEffect.() -> A): Resource<A> =
Resource({
val effect = ResourceEffectImpl()
val res = try {
f(effect)
} catch (e: Throwable) {
val ex = if (e is CancellationException) ExitCase.Cancelled(e)
else ExitCase.Failure(e)
val ee = effect.finalizers.get().traverseValidated { f ->
catchNel { f(ex) }
}.fold({
Platform.composeErrors(NonEmptyList(e, it))
}, { e })
throw ee
}
Pair(res, effect)
}) { (_, effect), ex ->
effect.finalizers.get().cancelAll(ex)
}.map { it.first }

private class ResourceEffectImpl : ResourceEffect {
val finalizers: AtomicRef<List<suspend (ExitCase) -> Unit>> = AtomicRef(emptyList())
override suspend fun <A> Resource<A>.bind(): A =
allocate { finalizer ->
finalizers.update { it + finalizer }
}
}

private suspend fun List<suspend (ExitCase) -> Unit>.cancelAll(
exitCase: ExitCase,
last: (suspend () -> Unit)? = null
): Unit {
val e = traverseValidated { finalizer ->
catchNel { finalizer(exitCase) }
}.fold({
Platform.composeErrors(it)
}, { null })
val e2 = runCatching { last?.invoke() }.exceptionOrNull()
Platform.composeErrors(e, e2)?.let { throw it }
}

// Interpreter that knows how to evaluate a Resource data structure
// Maintains its own stack for dealing with Bind chains
@Suppress("UNCHECKED_CAST")
private tailrec suspend fun useLoop(
current: Resource<Any?>,
stack: List<(Any?) -> Resource<Any?>>,
finalizers: List<suspend (ExitCase) -> Unit>,
handle: (Pair<Any?, suspend (ExitCase) -> Unit>) -> Unit
): Pair<Any?, suspend (ExitCase) -> Unit> =
when (current) {
is Resource.Defer -> useLoop(current.resource.invoke(), stack, finalizers, handle)
is Resource.Bind<*, *> ->
useLoop(current.source, listOf(current.f as (Any?) -> Resource<Any?>) + stack, finalizers, handle)
is Resource.Allocate -> loadResourceAndReleaseHandler(
acquire = current.acquire,
use = { a ->
when {
stack.isEmpty() -> Pair<Any?, suspend (ExitCase) -> Unit>(a) { ex ->
finalizers.cancelAll(ex) {
current.release(a, ex)
}
}.also(handle)
else -> useLoop(stack.first()(a), stack.drop(1), finalizers + { ex -> current.release(a, ex) }, handle)
}
},
release = { a, ex ->
if (ex != ExitCase.Completed) {
finalizers.cancelAll(ex) {
current.release(a, ex)
}
}
}
)
}

private suspend fun <A> Resource<A>.allocate(handle: (suspend (ExitCase) -> Unit) -> Unit): A =
useLoop(this, emptyList(), emptyList()) { (_, finalizer) ->
handle(finalizer)
}.first as A

private suspend inline fun loadResourceAndReleaseHandler(
crossinline acquire: suspend () -> Any?,
crossinline use: suspend (Any?) -> Pair<Any?, suspend (ExitCase) -> Unit>,
crossinline release: suspend (Any?, ExitCase) -> Unit
): Pair<Any?, suspend (ExitCase) -> Unit> {
val acquired = withContext(NonCancellable) {
acquire()
}

return try { // Successfully loaded resource, pass it and its release f down
val (b, _release) = use(acquired)
Pair(b) { ex -> _release(ex); release(acquired, ex) }
} catch (e: CancellationException) { // Release when cancelled
runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) }
} catch (t: Throwable) { // Release when failed to load resource
runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) }
}
}

private inline fun <A> catchNel(f: () -> A): ValidatedNel<Throwable, A> =
try {
f().valid()
} catch (e: Throwable) {
e.invalidNel()
}