From f8fc45a6bcb07b32f7902d15fdd2994f90df22e6 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 4 Nov 2021 20:35:10 +0100 Subject: [PATCH 01/13] Add Resource computation block --- .../kotlin/arrow/fx/coroutines/Resource.kt | 139 +++--------------- .../fx/coroutines/computations/resource.kt | 128 ++++++++++++++++ 2 files changed, 151 insertions(+), 116 deletions(-) create mode 100644 arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index 4b4f6901330..41144292c1d 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -1,15 +1,11 @@ package arrow.fx.coroutines import arrow.core.identity -import arrow.core.nonFatalOrThrow -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Deferred +import kotlin.coroutines.CoroutineContext import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.async -import kotlinx.coroutines.supervisorScope -import kotlinx.coroutines.withContext -import kotlin.coroutines.CoroutineContext + + /** * [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other @@ -442,42 +438,14 @@ public sealed class Resource { * } * //sampleEnd * ``` - * - * */ public fun parZip( ctx: CoroutineContext = Dispatchers.Default, fb: Resource, f: suspend (A, B) -> C ): Resource = - Resource({ - supervisorScope { - val faa = async(ctx) { allocate() } - val fbb = async(ctx) { fb.allocate() } - val a = awaitOrCancelOther(faa, fbb) - val b = awaitOrCancelOther(fbb, faa) - Pair(a, b) - } - }, { (ar, br), ex -> - val (_, releaseA) = ar - val (_, releaseB) = br - supervisorScope { - val faa = async(ctx) { releaseA(ex) } - val fbb = async(ctx) { releaseB(ex) } - try { - faa.await() - } catch (errorA: Throwable) { - try { - fbb.await() - } catch (errorB: Throwable) { - throw Platform.composeErrors(errorA, errorB) - } - throw errorA - } - fbb.await() - } - }).map { (ar, br) -> - f(ar.first, br.first) + arrow.fx.coroutines.computations.resource { + parZip(ctx, { this@Resource.bind() }, { fb.bind() }) { a, b -> f(a, b) } } public class Bind(public val source: Resource, public val f: (A) -> Resource) : Resource() @@ -589,25 +557,39 @@ public sealed class Resource { * } * ``` */ +@Deprecated("Use the resource computation DSL instead") public inline class Use(internal val acquire: suspend () -> A) /** * Marks an [acquire] operation as the [Resource.use] step of a [Resource]. */ +@Deprecated("Use the resource computation DSL instead", ReplaceWith("resource(acquire)", "arrow.fx.coroutines.computation.resource")) public fun resource(acquire: suspend () -> A): Use = Use(acquire) -/** - * Composes a [release] action to a [Resource.use] action creating a [Resource]. - */ +@Deprecated("Use the resource computation DSL instead") public infix fun Use.release(release: suspend (A) -> Unit): Resource = Resource(acquire) { a, _ -> release(a) } /** - * Composes a [releaseCase] action to a [Resource.use] action creating a [Resource]. + * Composes a [release] action to a [Resource.use] action creating a [Resource]. */ +public infix fun Resource.release(release: suspend (A) -> Unit): Resource = + flatMap { a -> + Resource({ a }, { _, _ -> release(a) }) + } + +@Deprecated("Use the resource computation DSL instead") public infix fun Use.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource = Resource(acquire, release) +/** + * Composes a [releaseCase] action to a [Resource.use] action creating a [Resource]. + */ +public infix fun Resource.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource = + flatMap { a -> + Resource({ a }, { _, ex -> release(a, ex) }) + } + /** * Traverse this [Iterable] and collects the resulting `Resource` of [f] into a `Resource>`. * @@ -689,78 +671,3 @@ public inline fun Iterable.traverseResource(crossinline f: (A) -> Reso @Suppress("NOTHING_TO_INLINE") public inline fun Iterable>.sequence(): Resource> = traverseResource(::identity) - -// Interpreter that knows how to evaluate a Resource data structure -// Maintains its own stack for dealing with Bind chains -@Suppress("UNCHECKED_CAST") -private tailrec suspend fun useLoop( - current: Resource, - stack: List<(Any?) -> Resource> -): Pair Unit> = - when (current) { - is Resource.Defer -> useLoop(current.resource.invoke(), stack) - is Resource.Bind<*, *> -> - useLoop(current.source, listOf(current.f as (Any?) -> Resource) + stack) - is Resource.Allocate -> loadResourceAndReleaseHandler( - acquire = current.acquire, - use = { a -> - when { - stack.isEmpty() -> Pair(a) { ex -> current.release(a, ex) } - else -> useLoop(stack.first()(a), stack.drop(1)) - } - }, - release = { _, _ -> /*a, exitCase -> current.release(a, exitCase)*/ } - ) - } - -private suspend fun Resource.allocate(): Pair Unit> = - useLoop(this, emptyList()) as Pair Unit> - -private suspend inline fun loadResourceAndReleaseHandler( - crossinline acquire: suspend () -> Any?, - crossinline use: suspend (Any?) -> Pair Unit>, - crossinline release: suspend (Any?, ExitCase) -> Unit -): Pair Unit> { - val acquired = withContext(NonCancellable) { - acquire() - } - - return try { // Successfully loaded resource, pass it and its release f down - val (b, _release) = use(acquired) - Pair(b) { ex -> _release(ex); release(acquired, ex) } - } catch (e: CancellationException) { // Release when cancelled - runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) } - } catch (t: Throwable) { // Release when failed to load resource - runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) } - } -} - -private suspend fun awaitOrCancelOther( - fa: Deferred Unit>>, - fb: Deferred Unit>> -): Pair Unit> = - try { - fa.await() - } catch (e: Throwable) { - if (e is CancellationException) awaitAndAddSuppressed(fb, e, ExitCase.Cancelled(e)) - else awaitAndAddSuppressed(fb, e, ExitCase.Failure(e)) - } - -private suspend fun awaitAndAddSuppressed( - fb: Deferred Unit>>, - e: Throwable, - exitCase: ExitCase -): Nothing { - val cancellationException = try { - if (fb.isCancelled && fb.isCompleted) fb.getCompletionExceptionOrNull() - else fb.await().second.invoke(exitCase).let { null } - } catch (e2: Throwable) { - throw e.apply { addSuppressed(e2) } - } - - val exception = cancellationException?.let { - e.apply { addSuppressed(it) } - } ?: e - - throw exception -} diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt new file mode 100644 index 00000000000..3706fbe7dea --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt @@ -0,0 +1,128 @@ +package arrow.fx.coroutines.computations + +import arrow.continuations.generic.AtomicRef +import arrow.continuations.generic.update +import arrow.core.NonEmptyList +import arrow.core.ValidatedNel +import arrow.core.invalidNel +import arrow.core.nonFatalOrThrow +import arrow.core.traverseValidated +import arrow.core.valid +import arrow.fx.coroutines.ExitCase +import arrow.fx.coroutines.Platform +import arrow.fx.coroutines.Resource +import arrow.fx.coroutines.runReleaseAndRethrow +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.withContext + +public interface ResourceEffect { + public suspend fun Resource.bind(): A +} + +public fun resource(f: suspend ResourceEffect.() -> A): Resource = + Resource({ + val effect = ResourceEffectImpl() + val res = try { + f(effect) + } catch (e: Throwable) { + val ex = if (e is CancellationException) ExitCase.Cancelled(e) + else ExitCase.Failure(e) + val ee = effect.finalizers.get().traverseValidated { f -> + catchNel { f(ex) } + }.fold({ + Platform.composeErrors(NonEmptyList(e, it)) + }, { e }) + throw ee + } + Pair(res, effect) + }) { (_, effect), ex -> + effect.finalizers.get().cancelAll(ex) + }.map { it.first } + +private class ResourceEffectImpl : ResourceEffect { + val finalizers: AtomicRef Unit>> = AtomicRef(emptyList()) + override suspend fun Resource.bind(): A = + allocate { finalizer -> + finalizers.update { it + finalizer } + } +} + +private suspend fun List Unit>.cancelAll( + exitCase: ExitCase, + last: (suspend () -> Unit)? = null +): Unit { + val e = traverseValidated { finalizer -> + catchNel { finalizer(exitCase) } + }.fold({ + Platform.composeErrors(it) + }, { null }) + val e2 = runCatching { last?.invoke() }.exceptionOrNull() + Platform.composeErrors(e, e2)?.let { throw it } +} + +// Interpreter that knows how to evaluate a Resource data structure +// Maintains its own stack for dealing with Bind chains +@Suppress("UNCHECKED_CAST") +private tailrec suspend fun useLoop( + current: Resource, + stack: List<(Any?) -> Resource>, + finalizers: List Unit>, + handle: (Pair Unit>) -> Unit +): Pair Unit> = + when (current) { + is Resource.Defer -> useLoop(current.resource.invoke(), stack, finalizers, handle) + is Resource.Bind<*, *> -> + useLoop(current.source, listOf(current.f as (Any?) -> Resource) + stack, finalizers, handle) + is Resource.Allocate -> loadResourceAndReleaseHandler( + acquire = current.acquire, + use = { a -> + when { + stack.isEmpty() -> Pair Unit>(a) { ex -> + finalizers.cancelAll(ex) { + current.release(a, ex) + } + }.also(handle) + else -> useLoop(stack.first()(a), stack.drop(1), finalizers + { ex -> current.release(a, ex) }, handle) + } + }, + release = { a, ex -> + if (ex != ExitCase.Completed) { + finalizers.cancelAll(ex) { + current.release(a, ex) + } + } + } + ) + } + +private suspend fun Resource.allocate(handle: (suspend (ExitCase) -> Unit) -> Unit): A = + useLoop(this, emptyList(), emptyList()) { (_, finalizer) -> + handle(finalizer) + }.first as A + +private suspend inline fun loadResourceAndReleaseHandler( + crossinline acquire: suspend () -> Any?, + crossinline use: suspend (Any?) -> Pair Unit>, + crossinline release: suspend (Any?, ExitCase) -> Unit +): Pair Unit> { + val acquired = withContext(NonCancellable) { + acquire() + } + + return try { // Successfully loaded resource, pass it and its release f down + val (b, _release) = use(acquired) + Pair(b) { ex -> _release(ex); release(acquired, ex) } + } catch (e: CancellationException) { // Release when cancelled + runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) } + } catch (t: Throwable) { // Release when failed to load resource + runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) } + } +} + +private inline fun catchNel(f: () -> A): ValidatedNel = + try { + f().valid() + } catch (e: Throwable) { + e.invalidNel() + } From da8d2b0da64009560c8b1d49c526dc343b74afc8 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 4 Nov 2021 21:02:41 +0100 Subject: [PATCH 02/13] Add tests --- .../arrow/fx/coroutines/ResourceTest.kt | 114 +++++++++++++++++- 1 file changed, 113 insertions(+), 1 deletion(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index 3d42ae4314d..3153524faf9 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -1,14 +1,15 @@ package arrow.fx.coroutines import arrow.core.Either +import arrow.core.left import io.kotest.assertions.fail import io.kotest.matchers.should import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb +import io.kotest.property.arbitrary.bool import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.list import io.kotest.property.arbitrary.map import io.kotest.property.arbitrary.string import kotlinx.coroutines.CancellationException @@ -109,6 +110,117 @@ class ResourceTest : ArrowFxSpec( } } + "Resource can close from either" { + val exit = CompletableDeferred() + arrow.core.computations.either { + arrow.fx.coroutines.computations.resource { + Resource({ 1 }) { _, ex -> exit.complete(ex) }.bind() + "error".left().bind() + 1 + }.use { it } + } shouldBe "error".left() + // Should be ExitCase.Cancelled but still Failure due to ShortCircuit + // Cont will fix this issue by properly shifting and cancelling + exit.await().shouldBeTypeOf() + } + + val depth: Int = 100 + + class CheckableAutoClose { + var started = true + fun close() { + started = false + } + } + + fun closeable(): Resource = + Resource({ CheckableAutoClose() }) { a, _ -> a.close() } + + "parZip - success" { + val all = (1..depth).traverseResource { closeable() }.parZip( + (1..depth).traverseResource { closeable() } + ) { a, b -> a + b }.use { all -> + all.also { all.forEach { it.started shouldBe true } } + } + all.forEach { it.started shouldBe false } + } + + fun generate(): Pair>, Resource> { + val promises = (1..depth).map { Pair(it, CompletableDeferred()) } + val res = promises.fold(Resource({ 0 }, { _, _ -> })) { acc, (i, promise) -> + acc.flatMap { ii: Int -> + Resource({ ii + i }) { _, _ -> + promise.complete(i) + } + } + } + return Pair(promises.map { it.second }, res) + } + + "parZip - deep finalizers are called when final one blows" { + io.kotest.property.checkAll(3, Arb.int(10..100)) { + val (promises, resource) = generate() + assertThrowable { + resource.flatMap { + Resource({ throw RuntimeException() }) { _, _ -> } + }.parZip(Resource({ }) { _, _ -> }) { _, _ -> } + .use { fail("It should never reach here") } + }.shouldBeTypeOf() + + (1..depth).zip(promises) { i, promise -> + println(promise.isCompleted) + promise.await() shouldBe i + } + } + } + + "parZip - deep finalizers are called when final one cancels" { + io.kotest.property.checkAll(3, Arb.int(10..100)) { + val cancel = CancellationException(null, null) + val (promises, resource) = generate() + assertThrowable { + resource.flatMap { + Resource({ throw cancel }) { _, _ -> } + }.parZip(Resource({ }) { _, _ -> }) { _, _ -> } + .use { fail("It should never reach here") } + }.shouldBeTypeOf() + + (1..depth).zip(promises) { i, promise -> + println(promise.isCompleted) + promise.await() shouldBe i + } + } + } + + // Test multiple release triggers on acquire fail. + "parZip - Deep finalizers get called on left or right cancellation" { + checkAll(Arb.bool()) { isLeft -> + val cancel = CancellationException(null, null) + val (promises, resource) = generate() + val latch = CompletableDeferred() + assertThrowable { + val res = if (isLeft) Resource({ + latch.await() shouldBe (1..depth).sum() + throw cancel + }) { _, _ -> }.parZip(resource.flatMap { + Resource({ latch.complete(it) }) { _, _ -> } + }) { _, _ -> } + else resource.flatMap { + Resource({ latch.complete(it) }) { _, _ -> } + }.parZip(Resource({ + latch.await() shouldBe (1..depth).sum() + throw cancel + }) { _, _ -> }) { _, _ -> } + + res.use { fail("It should never reach here") } + }.shouldBeTypeOf() + + (1..depth).zip(promises) { i, promise -> + promise.await() shouldBe i + } + } + } + "parZip - Right CancellationException on acquire" { checkAll(Arb.int()) { i -> val cancel = CancellationException(null, null) From 64f60ed7eee55d2b740fa410ab04a0c7707bb2c5 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 4 Nov 2021 21:10:33 +0100 Subject: [PATCH 03/13] apiDump --- .../fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index d601cba41a3..a56209fb408 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -354,7 +354,9 @@ public final class arrow/fx/coroutines/ResourceExtensionsKt { } public final class arrow/fx/coroutines/ResourceKt { + public static final fun release (Larrow/fx/coroutines/Resource;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource; public static final fun release-zgiIeyo (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource; + public static final fun releaseCase (Larrow/fx/coroutines/Resource;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Resource; public static final fun releaseCase-zgiIeyo (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Resource; public static final fun resource (Lkotlin/jvm/functions/Function1;)Lkotlin/jvm/functions/Function1; public static final fun sequence (Ljava/lang/Iterable;)Larrow/fx/coroutines/Resource; @@ -488,3 +490,11 @@ public final class arrow/fx/coroutines/Use { public final synthetic fun unbox-impl ()Lkotlin/jvm/functions/Function1; } +public abstract interface class arrow/fx/coroutines/computations/ResourceEffect { + public abstract fun bind (Larrow/fx/coroutines/Resource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public final class arrow/fx/coroutines/computations/ResourceKt { + public static final fun resource (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource; +} + From c8aadb0fee5150ec4457407c8f920fa7ca42ccec Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 4 Nov 2021 21:27:04 +0100 Subject: [PATCH 04/13] KtLintFormat --- .../src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index 41144292c1d..feca0423484 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -5,8 +5,6 @@ import kotlin.coroutines.CoroutineContext import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.NonCancellable - - /** * [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other * need to be acquired and later released in reverse order. From 6bfdd271344808b833677a7fe7e6f083c532daa5 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 4 Nov 2021 22:00:57 +0100 Subject: [PATCH 05/13] Bump timeout jvm task --- .github/workflows/pull_request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index a661a8a4f68..08b0d907062 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -57,7 +57,7 @@ jobs: linux_jvm_and_js: runs-on: ubuntu-latest - timeout-minutes: 25 + timeout-minutes: 60 steps: - uses: actions/checkout@v2 From f45d5604d2765c634b5bc904f6386ad55fe063e4 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 4 Nov 2021 23:13:24 +0100 Subject: [PATCH 06/13] Add latch to await both resource starting in parZip --- .../kotlin/arrow/fx/coroutines/ResourceTest.kt | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index 3153524faf9..79d64369021 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -243,10 +243,14 @@ class ResourceTest : ArrowFxSpec( checkAll(Arb.int()) { i -> val cancel = CancellationException(null, null) val released = CompletableDeferred>() + val started = CompletableDeferred() assertThrowable { - Resource({ throw cancel }) { _, _ -> } - .parZip(Resource({ i }, { ii, ex -> + Resource({ + started.await() + throw cancel + }) { _, _ -> } + .parZip(Resource({ started.complete(Unit); i }, { ii, ex -> released.complete(ii to ex) })) { _, _ -> } .use { fail("It should never reach here") } @@ -279,11 +283,14 @@ class ResourceTest : ArrowFxSpec( "parZip - Left error on acquire" { checkAll(Arb.int(), Arb.throwable()) { i, throwable -> val released = CompletableDeferred>() - + val started = CompletableDeferred() assertThrowable { - Resource({ throw throwable }) { _, _ -> } + Resource({ + started.await() + throw throwable + }) { _, _ -> } .parZip( - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ started.complete(Unit); i }, { ii, ex -> released.complete(ii to ex) }) ) { _, _ -> } .use { fail("It should never reach here") } } shouldBe throwable From 310bfe417027596fc567065f24f956a167b5d5bd Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 4 Nov 2021 23:38:24 +0100 Subject: [PATCH 07/13] Add start latches right error acquire --- .../kotlin/arrow/fx/coroutines/ResourceTest.kt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index 79d64369021..20d3736ca19 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -225,11 +225,11 @@ class ResourceTest : ArrowFxSpec( checkAll(Arb.int()) { i -> val cancel = CancellationException(null, null) val released = CompletableDeferred>() - + val started = CompletableDeferred() assertThrowable { - Resource({ i }, { ii, ex -> + Resource({ started.complete(Unit); i }, { ii, ex -> released.complete(ii to ex) - }).parZip(Resource({ throw cancel }) { _, _ -> }) { _, _ -> } + }).parZip(Resource({ started.await(); throw cancel }) { _, _ -> }) { _, _ -> } .use { fail("It should never reach here") } }.shouldBeTypeOf() @@ -265,11 +265,11 @@ class ResourceTest : ArrowFxSpec( "parZip - Right error on acquire" { checkAll(Arb.int(), Arb.throwable()) { i, throwable -> val released = CompletableDeferred>() - + val started = CompletableDeferred() assertThrowable { - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ started.complete(Unit); i }, { ii, ex -> released.complete(ii to ex) }) .parZip( - Resource({ throw throwable }) { _, _ -> } + Resource({ started.await(); throw throwable }) { _, _ -> } ) { _, _ -> } .use { fail("It should never reach here") } } shouldBe throwable From a640591dd30ba8e392be1d67eb0ca0ad39712416 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Fri, 5 Nov 2021 13:12:58 +0100 Subject: [PATCH 08/13] handle is part of resource acquisition, so saving the finalizer is part of it --- .../fx/coroutines/computations/resource.kt | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt index 3706fbe7dea..1bcdc77a51c 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt @@ -75,17 +75,23 @@ private tailrec suspend fun useLoop( is Resource.Bind<*, *> -> useLoop(current.source, listOf(current.f as (Any?) -> Resource) + stack, finalizers, handle) is Resource.Allocate -> loadResourceAndReleaseHandler( - acquire = current.acquire, - use = { a -> - when { - stack.isEmpty() -> Pair Unit>(a) { ex -> - finalizers.cancelAll(ex) { - current.release(a, ex) - } - }.also(handle) - else -> useLoop(stack.first()(a), stack.drop(1), finalizers + { ex -> current.release(a, ex) }, handle) + acquire = { + current.acquire().let { a -> + if (stack.isEmpty()) { + Pair Unit>(a) { ex -> + finalizers.cancelAll(ex) { + current.release(a, ex) + } + }.also(handle) + } else { + a + } } }, + use = { a -> + if (stack.isEmpty()) a as Pair Unit> + else useLoop(stack.first()(a), stack.drop(1), finalizers + { ex -> current.release(a, ex) }, handle) + }, release = { a, ex -> if (ex != ExitCase.Completed) { finalizers.cancelAll(ex) { From fa98ded3c7aec5b6f2b176a6d5254bc9e7b80aa4 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Fri, 5 Nov 2021 19:38:56 +0100 Subject: [PATCH 09/13] Deprecate Resource ADT, so we can use new runtime --- .../src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index feca0423484..18adf37faab 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -446,13 +446,16 @@ public sealed class Resource { parZip(ctx, { this@Resource.bind() }, { fb.bind() }) { a, b -> f(a, b) } } + @Deprecated("Internals of Resource are changing, do not rely on the Bind class.") public class Bind(public val source: Resource, public val f: (A) -> Resource) : Resource() + @Deprecated("Internals of Resource are changing, do not rely on the Allocate class.") public class Allocate( public val acquire: suspend () -> A, public val release: suspend (A, ExitCase) -> Unit ) : Resource() + @Deprecated("Internals of Resource are changing, do not rely on the Defer class.") public class Defer(public val resource: suspend () -> Resource) : Resource() public companion object { From 07a1a15e64baee42b8d4b04dc493853241212743 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Fri, 5 Nov 2021 23:40:49 +0100 Subject: [PATCH 10/13] Rewrite runloop Resource --- .../kotlin/arrow/fx/coroutines/Resource.kt | 236 +++++++++++------- .../fx/coroutines/computations/resource.kt | 126 +--------- 2 files changed, 153 insertions(+), 209 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index 18adf37faab..c18bfffe4d1 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -1,9 +1,19 @@ package arrow.fx.coroutines +import arrow.continuations.generic.AtomicRef +import arrow.continuations.generic.update +import arrow.core.NonEmptyList +import arrow.core.ValidatedNel import arrow.core.identity +import arrow.core.invalidNel +import arrow.core.traverseValidated +import arrow.core.valid +import arrow.fx.coroutines.computations.ResourceEffect import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.withContext /** * [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other @@ -158,18 +168,46 @@ public sealed class Resource { * ``` */ @Suppress("UNCHECKED_CAST") - public suspend infix fun use(f: suspend (A) -> B): B = - useLoop(this as Resource, f as suspend (Any?) -> Any?, emptyList()) as B + public tailrec suspend infix fun use(f: suspend (A) -> B): B = + when (this) { + is Dsl -> { + val effect = ResEffectImpl() + val b = try { + val a = dsl(effect) + f(a) + } catch (e: Throwable) { + val ex = if (e is CancellationException) ExitCase.Cancelled(e) else ExitCase.Failure(e) + val ee = withContext(NonCancellable) { + effect.finalizers.get().cancelAll(ex, e) ?: e + } + throw ee + } + withContext(NonCancellable) { + effect.finalizers.get().cancelAll(ExitCase.Completed)?.let { throw it } + } + b + } + is Allocate -> bracketCase(acquire, f, release) + is Bind<*, *> -> Dsl { + val any = source.bind() + val ff = this@Resource.f as (Any?) -> Resource + ff(any).bind() + }.use(f) + is Defer -> resource().use(f) + } public fun map(f: suspend (A) -> B): Resource = - flatMap { a -> Resource({ f(a) }) { _, _ -> } } + arrow.fx.coroutines.computations.resource { f(bind()) } /** Useful for setting up/configuring an acquired resource */ public fun tap(f: suspend (A) -> Unit): Resource = - map { f(it); it } + arrow.fx.coroutines.computations.resource { bind().also { f(it) } } public fun ap(ff: Resource<(A) -> B>): Resource = - flatMap { res -> ff.map { it(res) } } + arrow.fx.coroutines.computations.resource { + val a = bind() + ff.bind()(a) + } /** * Create a resource value of [B] from a resource [A] by mapping [f]. @@ -212,12 +250,13 @@ public sealed class Resource { * @see zip to combine independent resources together * @see parZip for combining independent resources in parallel */ - public fun flatMap(f: (A) -> Resource): Resource = - Bind(this, f) + public fun flatMap(f: (A) -> Resource): Resource = arrow.fx.coroutines.computations.resource { + f(bind()).bind() + } public inline fun zip(other: Resource, crossinline combine: (A, B) -> C): Resource = - flatMap { r -> - other.map { r2 -> combine(r, r2) } + arrow.fx.coroutines.computations.resource { + combine(bind(), other.bind()) } public fun zip(other: Resource): Resource> = @@ -277,8 +316,8 @@ public sealed class Resource { c: Resource, crossinline map: (A, B, C) -> D ): Resource = - zip(b, c, unit, unit, unit, unit, unit, unit, unit) { a, b, c, _, _, _, _, _, _, _ -> - map(a, b, c) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind()) } public inline fun zip( @@ -287,8 +326,8 @@ public sealed class Resource { d: Resource, crossinline map: (A, B, C, D) -> E ): Resource = - zip(b, c, d, unit, unit, unit, unit, unit, unit) { a, b, c, d, e, _, _, _, _, _ -> - map(a, b, c, d) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind()) } public inline fun zip( @@ -298,8 +337,8 @@ public sealed class Resource { e: Resource, crossinline map: (A, B, C, D, E) -> G ): Resource = - zip(b, c, d, e, unit, unit, unit, unit, unit) { a, b, c, d, e, _, _, _, _, _ -> - map(a, b, c, d, e) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind()) } public inline fun zip( @@ -310,8 +349,8 @@ public sealed class Resource { f: Resource, crossinline map: (A, B, C, D, E, F) -> G ): Resource = - zip(b, c, d, e, f, unit, unit, unit, unit) { b, c, d, e, f, g, _, _, _, _ -> - map(b, c, d, e, f, g) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind()) } public inline fun zip( @@ -323,8 +362,8 @@ public sealed class Resource { g: Resource, crossinline map: (A, B, C, D, E, F, G) -> H ): Resource = - zip(b, c, d, e, f, g, unit, unit, unit) { a, b, c, d, e, f, g, _, _, _ -> - map(a, b, c, d, e, f, g) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind()) } public inline fun zip( @@ -337,8 +376,8 @@ public sealed class Resource { h: Resource, crossinline map: (A, B, C, D, E, F, G, H) -> I ): Resource = - zip(b, c, d, e, f, g, h, unit, unit) { a, b, c, d, e, f, g, h, _, _ -> - map(a, b, c, d, e, f, g, h) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind()) } public inline fun zip( @@ -352,8 +391,8 @@ public sealed class Resource { i: Resource, crossinline map: (A, B, C, D, E, F, G, H, I) -> J ): Resource = - zip(b, c, d, e, f, g, h, i, unit) { a, b, c, d, e, f, g, h, i, _ -> - map(a, b, c, d, e, f, g, h, i) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind(), i.bind()) } public inline fun zip( @@ -368,26 +407,8 @@ public sealed class Resource { j: Resource, crossinline map: (A, B, C, D, E, F, G, H, I, J) -> K ): Resource = - flatMap { aa -> - b.flatMap { bb -> - c.flatMap { cc -> - d.flatMap { dd -> - e.flatMap { ee -> - f.flatMap { ff -> - g.flatMap { gg -> - h.flatMap { hh -> - i.flatMap { ii -> - j.map { jj -> - map(aa, bb, cc, dd, ee, ff, gg, hh, ii, jj) - } - } - } - } - } - } - } - } - } + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind(), i.bind(), j.bind()) } public fun parZip(fb: Resource, f: suspend (A, B) -> C): Resource = @@ -446,18 +467,31 @@ public sealed class Resource { parZip(ctx, { this@Resource.bind() }, { fb.bind() }) { a, b -> f(a, b) } } - @Deprecated("Internals of Resource are changing, do not rely on the Bind class.") + @Deprecated( + "Bind is being deprecated. Use resource DSL instead", + ReplaceWith( + "resource { f(source.bind()) }", + "arrow.fx.coroutines.computations.resource" + ) + ) public class Bind(public val source: Resource, public val f: (A) -> Resource) : Resource() - @Deprecated("Internals of Resource are changing, do not rely on the Allocate class.") public class Allocate( public val acquire: suspend () -> A, public val release: suspend (A, ExitCase) -> Unit ) : Resource() - @Deprecated("Internals of Resource are changing, do not rely on the Defer class.") + @Deprecated( + "Defer is being deprecated. Use resource DSL instead", + ReplaceWith( + "resource { resource.invoke().bind() }", + "arrow.fx.coroutines.computations.resource" + ) + ) public class Defer(public val resource: suspend () -> Resource) : Resource() + internal data class Dsl(public val dsl: suspend ResourceEffect.() -> A) : Resource() + public companion object { @PublishedApi @@ -496,36 +530,6 @@ public sealed class Resource { public fun defer(f: suspend () -> Resource): Resource = Resource.Defer(f) } - - private suspend fun continueLoop( - current: Resource, - use: suspend (Any?) -> Any?, - stack: List<(Any?) -> Resource> - ): Any? = useLoop(current, use, stack) - - // Interpreter that knows how to evaluate a Resource data structure - // Maintains its own stack for dealing with Bind chains - @Suppress("UNCHECKED_CAST") - private tailrec suspend fun useLoop( - current: Resource, - use: suspend (Any?) -> Any?, - stack: List<(Any?) -> Resource> - ): Any? = - when (current) { - is Defer -> useLoop(current.resource.invoke(), use, stack) - is Bind<*, *> -> - useLoop(current.source as Resource, use, listOf(current.f as (Any?) -> Resource) + stack) - is Allocate -> bracketCase( - acquire = current.acquire, - use = { a -> - when { - stack.isEmpty() -> use(a) - else -> continueLoop(stack.first()(a), use, stack.drop(1)) - } - }, - release = { a, exitCase -> current.release(a, exitCase) } - ) - } } /** @@ -558,13 +562,25 @@ public sealed class Resource { * } * ``` */ -@Deprecated("Use the resource computation DSL instead") +@Deprecated( + "Use the resource computation DSL instead", + ReplaceWith( + "resource { acquire() }", + "arrow.fx.coroutines.computations.resource" + ) +) public inline class Use(internal val acquire: suspend () -> A) /** * Marks an [acquire] operation as the [Resource.use] step of a [Resource]. */ -@Deprecated("Use the resource computation DSL instead", ReplaceWith("resource(acquire)", "arrow.fx.coroutines.computation.resource")) +@Deprecated( + "Use the resource computation DSL instead", + ReplaceWith( + "resource { acquire() }", + "arrow.fx.coroutines.computations.resource" + ) +) public fun resource(acquire: suspend () -> A): Use = Use(acquire) @Deprecated("Use the resource computation DSL instead") @@ -575,8 +591,9 @@ public infix fun Use.release(release: suspend (A) -> Unit): Resource = * Composes a [release] action to a [Resource.use] action creating a [Resource]. */ public infix fun Resource.release(release: suspend (A) -> Unit): Resource = - flatMap { a -> - Resource({ a }, { _, _ -> release(a) }) + arrow.fx.coroutines.computations.resource { + val a = bind() + Resource({ a }, { _, _ -> release(a) }).bind() } @Deprecated("Use the resource computation DSL instead") @@ -587,8 +604,9 @@ public infix fun Use.releaseCase(release: suspend (A, ExitCase) -> Unit): * Composes a [releaseCase] action to a [Resource.use] action creating a [Resource]. */ public infix fun Resource.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource = - flatMap { a -> - Resource({ a }, { _, ex -> release(a, ex) }) + arrow.fx.coroutines.computations.resource { + val a = bind() + Resource({ a }, { _, ex -> release(a, ex) }).bind() } /** @@ -628,8 +646,10 @@ public infix fun Resource.releaseCase(release: suspend (A, ExitCase) -> U * ``` */ public inline fun Iterable.traverseResource(crossinline f: (A) -> Resource): Resource> = - fold(Resource.just(emptyList())) { acc: Resource>, a: A -> - f(a).ap(acc.map { { b: B -> it + b } }) + arrow.fx.coroutines.computations.resource { + map { a -> + f(a).bind() + } } /** @@ -672,3 +692,51 @@ public inline fun Iterable.traverseResource(crossinline f: (A) -> Reso @Suppress("NOTHING_TO_INLINE") public inline fun Iterable>.sequence(): Resource> = traverseResource(::identity) + +private class ResEffectImpl : ResourceEffect { + val finalizers: AtomicRef Unit>> = AtomicRef(emptyList()) + override suspend fun Resource.bind(): A = + when (this) { + is Resource.Dsl -> dsl.invoke(this@ResEffectImpl) + is Resource.Allocate -> bracketCase({ + val a = acquire() + val finalizer: suspend (ExitCase) -> Unit = { ex: ExitCase -> release(a, ex) } + finalizers.update { it + finalizer } + a + }, ::identity, { a, ex -> + // Only if ExitCase.Failure, or ExitCase.Cancelled during acquire we cancel + // Otherwise we've saved the finalizer, and it will be called from somewhere else. + if (ex != ExitCase.Completed) { + withContext(NonCancellable) { + val e = finalizers.get().cancelAll(ex) + val e2 = runCatching { release(a, ex) }.exceptionOrNull() + Platform.composeErrors(e, e2)?.let { throw it } + } + } + }) + is Resource.Bind<*, *> -> Resource.Dsl { + val any = source.bind() + val ff = f as (Any?) -> Resource + ff(any).bind() + }.bind() + is Resource.Defer -> resource().bind() + } +} + +// Version that doesn't rethrow `CancellationException` because we need to run all finalizers regardless of CancellationException +private inline fun catchNel(f: () -> A): ValidatedNel = + try { + f().valid() + } catch (e: Throwable) { + e.invalidNel() + } + +private suspend fun List Unit>.cancelAll( + exitCase: ExitCase, + first: Throwable? = null +): Throwable? = traverseValidated { f -> + catchNel { f(exitCase) } +}.fold({ + if (first != null) Platform.composeErrors(NonEmptyList(first, it)) + else Platform.composeErrors(it) +}, { first }) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt index 1bcdc77a51c..7c86011efc3 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt @@ -1,134 +1,10 @@ package arrow.fx.coroutines.computations -import arrow.continuations.generic.AtomicRef -import arrow.continuations.generic.update -import arrow.core.NonEmptyList -import arrow.core.ValidatedNel -import arrow.core.invalidNel -import arrow.core.nonFatalOrThrow -import arrow.core.traverseValidated -import arrow.core.valid -import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.Platform import arrow.fx.coroutines.Resource -import arrow.fx.coroutines.runReleaseAndRethrow -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.withContext public interface ResourceEffect { public suspend fun Resource.bind(): A } public fun resource(f: suspend ResourceEffect.() -> A): Resource = - Resource({ - val effect = ResourceEffectImpl() - val res = try { - f(effect) - } catch (e: Throwable) { - val ex = if (e is CancellationException) ExitCase.Cancelled(e) - else ExitCase.Failure(e) - val ee = effect.finalizers.get().traverseValidated { f -> - catchNel { f(ex) } - }.fold({ - Platform.composeErrors(NonEmptyList(e, it)) - }, { e }) - throw ee - } - Pair(res, effect) - }) { (_, effect), ex -> - effect.finalizers.get().cancelAll(ex) - }.map { it.first } - -private class ResourceEffectImpl : ResourceEffect { - val finalizers: AtomicRef Unit>> = AtomicRef(emptyList()) - override suspend fun Resource.bind(): A = - allocate { finalizer -> - finalizers.update { it + finalizer } - } -} - -private suspend fun List Unit>.cancelAll( - exitCase: ExitCase, - last: (suspend () -> Unit)? = null -): Unit { - val e = traverseValidated { finalizer -> - catchNel { finalizer(exitCase) } - }.fold({ - Platform.composeErrors(it) - }, { null }) - val e2 = runCatching { last?.invoke() }.exceptionOrNull() - Platform.composeErrors(e, e2)?.let { throw it } -} - -// Interpreter that knows how to evaluate a Resource data structure -// Maintains its own stack for dealing with Bind chains -@Suppress("UNCHECKED_CAST") -private tailrec suspend fun useLoop( - current: Resource, - stack: List<(Any?) -> Resource>, - finalizers: List Unit>, - handle: (Pair Unit>) -> Unit -): Pair Unit> = - when (current) { - is Resource.Defer -> useLoop(current.resource.invoke(), stack, finalizers, handle) - is Resource.Bind<*, *> -> - useLoop(current.source, listOf(current.f as (Any?) -> Resource) + stack, finalizers, handle) - is Resource.Allocate -> loadResourceAndReleaseHandler( - acquire = { - current.acquire().let { a -> - if (stack.isEmpty()) { - Pair Unit>(a) { ex -> - finalizers.cancelAll(ex) { - current.release(a, ex) - } - }.also(handle) - } else { - a - } - } - }, - use = { a -> - if (stack.isEmpty()) a as Pair Unit> - else useLoop(stack.first()(a), stack.drop(1), finalizers + { ex -> current.release(a, ex) }, handle) - }, - release = { a, ex -> - if (ex != ExitCase.Completed) { - finalizers.cancelAll(ex) { - current.release(a, ex) - } - } - } - ) - } - -private suspend fun Resource.allocate(handle: (suspend (ExitCase) -> Unit) -> Unit): A = - useLoop(this, emptyList(), emptyList()) { (_, finalizer) -> - handle(finalizer) - }.first as A - -private suspend inline fun loadResourceAndReleaseHandler( - crossinline acquire: suspend () -> Any?, - crossinline use: suspend (Any?) -> Pair Unit>, - crossinline release: suspend (Any?, ExitCase) -> Unit -): Pair Unit> { - val acquired = withContext(NonCancellable) { - acquire() - } - - return try { // Successfully loaded resource, pass it and its release f down - val (b, _release) = use(acquired) - Pair(b) { ex -> _release(ex); release(acquired, ex) } - } catch (e: CancellationException) { // Release when cancelled - runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) } - } catch (t: Throwable) { // Release when failed to load resource - runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) } - } -} - -private inline fun catchNel(f: () -> A): ValidatedNel = - try { - f().valid() - } catch (e: Throwable) { - e.invalidNel() - } + Resource.Dsl(f) From d7672fb91994861b59c9ebf1a835a79aa6ad09b4 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Sat, 6 Nov 2021 00:18:08 +0100 Subject: [PATCH 11/13] Make traverseResource test lazy --- .../commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index 20d3736ca19..8c16dfe6813 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -1,6 +1,7 @@ package arrow.fx.coroutines import arrow.core.Either +import arrow.core.identity import arrow.core.left import io.kotest.assertions.fail import io.kotest.matchers.should @@ -104,9 +105,8 @@ class ResourceTest : ArrowFxSpec( "traverseResource: leftToRight" { checkAll(Arb.list(Arb.int())) { list -> - val mutable = mutableListOf() - list.traverseResource { mutable.add(it); Resource.just(Unit) } - mutable.toList() shouldBe list + list.traverseResource { Resource.just(it) } + .use(::identity) shouldBe list } } From bf8da5ad40fe67930661d786ed88abb4d2227540 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Sat, 6 Nov 2021 10:56:31 +0100 Subject: [PATCH 12/13] Verify functions are never called twice, add docs --- .../kotlin/arrow/fx/coroutines/Resource.kt | 13 +++-- .../fx/coroutines/computations/resource.kt | 46 ++++++++++++++++ .../arrow/fx/coroutines/ResourceTest.kt | 52 ++++++++++--------- 3 files changed, 81 insertions(+), 30 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index c18bfffe4d1..50bd4f59366 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -714,11 +714,14 @@ private class ResEffectImpl : ResourceEffect { } } }) - is Resource.Bind<*, *> -> Resource.Dsl { - val any = source.bind() - val ff = f as (Any?) -> Resource - ff(any).bind() - }.bind() + is Resource.Bind<*, *> -> { + val dsl: suspend ResourceEffect.() -> A = { + val any = source.bind() + val ff = f as (Any?) -> Resource + ff(any).bind() + } + dsl(this@ResEffectImpl) + } is Resource.Defer -> resource().bind() } } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt index 7c86011efc3..0c252bd751d 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt @@ -2,6 +2,52 @@ package arrow.fx.coroutines.computations import arrow.fx.coroutines.Resource +/** + * Computation block for the [Resource] type. + * The [Resource] allows us to describe resources as immutable values, + * and compose them together in simple ways. + * This way you can split the logic of what a `Resource` is and how it should be closed from how you use them. + * + * * # Using and composing Resource + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.computations.resource + * import arrow.fx.coroutines.release + * + * class UserProcessor { + * fun start(): Unit = println("Creating UserProcessor") + * fun shutdown(): Unit = println("Shutting down UserProcessor") + * fun process(ds: DataSource): List = + * ds.users().map { "Processed $it" } + * } + * + * class DataSource { + * fun connect(): Unit = println("Connecting dataSource") + * fun users(): List = listOf("User-1", "User-2", "User-3") + * fun close(): Unit = println("Closed dataSource") + * } + * + * class Service(val db: DataSource, val userProcessor: UserProcessor) { + * suspend fun processData(): List = userProcessor.process(db) + * } + * + * //sampleStart + * val userProcessor = resource { + * UserProcessor().also(UserProcessor::start) + * } release UserProcessor::shutdown + * + * val dataSource = resource { + * DataSource().also { it.connect() } + * } release DataSource::close + * + * suspend fun main(): Unit { + * resource { + * Service(dataSource.bind(), userProcessor.bind()) + * }.use { service -> service.processData() } + * } + * //sampleEnd + * ``` + */ public interface ResourceEffect { public suspend fun Resource.bind(): A } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index 8c16dfe6813..b5d1751bb1c 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -31,7 +31,7 @@ class ResourceTest : ArrowFxSpec( "value resource is released with Complete" { checkAll(Arb.int()) { n -> val p = CompletableDeferred() - Resource({ n }, { _, ex -> p.complete(ex) }) + Resource({ n }, { _, ex -> require(p.complete(ex)) }) .use { Unit } p.await() shouldBe ExitCase.Completed @@ -41,7 +41,7 @@ class ResourceTest : ArrowFxSpec( "error resource finishes with error" { checkAll(Arb.throwable()) { e -> val p = CompletableDeferred() - val r = Resource({ throw e }, { _, ex -> p.complete(ex) }) + val r = Resource({ throw e }, { _, ex -> require(p.complete(ex)) }) Either.catch { r.use { it + 1 } @@ -53,11 +53,11 @@ class ResourceTest : ArrowFxSpec( checkAll(Arb.int()) { n -> val p = CompletableDeferred() val start = CompletableDeferred() - val r = Resource({ n }, { _, ex -> p.complete(ex) }) + val r = Resource({ n }, { _, ex -> require(p.complete(ex)) }) val f = async { r.use { - start.complete(Unit) + require(start.complete(Unit)) never() } } @@ -114,7 +114,7 @@ class ResourceTest : ArrowFxSpec( val exit = CompletableDeferred() arrow.core.computations.either { arrow.fx.coroutines.computations.resource { - Resource({ 1 }) { _, ex -> exit.complete(ex) }.bind() + Resource({ 1 }) { _, ex -> require(exit.complete(ex)) }.bind() "error".left().bind() 1 }.use { it } @@ -150,7 +150,7 @@ class ResourceTest : ArrowFxSpec( val res = promises.fold(Resource({ 0 }, { _, _ -> })) { acc, (i, promise) -> acc.flatMap { ii: Int -> Resource({ ii + i }) { _, _ -> - promise.complete(i) + require(promise.complete(i)) } } } @@ -168,7 +168,6 @@ class ResourceTest : ArrowFxSpec( }.shouldBeTypeOf() (1..depth).zip(promises) { i, promise -> - println(promise.isCompleted) promise.await() shouldBe i } } @@ -186,7 +185,6 @@ class ResourceTest : ArrowFxSpec( }.shouldBeTypeOf() (1..depth).zip(promises) { i, promise -> - println(promise.isCompleted) promise.await() shouldBe i } } @@ -203,10 +201,10 @@ class ResourceTest : ArrowFxSpec( latch.await() shouldBe (1..depth).sum() throw cancel }) { _, _ -> }.parZip(resource.flatMap { - Resource({ latch.complete(it) }) { _, _ -> } + Resource({ require(latch.complete(it)) }) { _, _ -> } }) { _, _ -> } else resource.flatMap { - Resource({ latch.complete(it) }) { _, _ -> } + Resource({ require(latch.complete(it)) }) { _, _ -> } }.parZip(Resource({ latch.await() shouldBe (1..depth).sum() throw cancel @@ -227,8 +225,8 @@ class ResourceTest : ArrowFxSpec( val released = CompletableDeferred>() val started = CompletableDeferred() assertThrowable { - Resource({ started.complete(Unit); i }, { ii, ex -> - released.complete(ii to ex) + Resource({ require(started.complete(Unit)); i }, { ii, ex -> + require(released.complete(ii to ex)) }).parZip(Resource({ started.await(); throw cancel }) { _, _ -> }) { _, _ -> } .use { fail("It should never reach here") } }.shouldBeTypeOf() @@ -250,8 +248,8 @@ class ResourceTest : ArrowFxSpec( started.await() throw cancel }) { _, _ -> } - .parZip(Resource({ started.complete(Unit); i }, { ii, ex -> - released.complete(ii to ex) + .parZip(Resource({ require(started.complete(Unit)); i }, { ii, ex -> + require(released.complete(ii to ex)) })) { _, _ -> } .use { fail("It should never reach here") } }.shouldBeTypeOf() @@ -267,8 +265,10 @@ class ResourceTest : ArrowFxSpec( val released = CompletableDeferred>() val started = CompletableDeferred() assertThrowable { - Resource({ started.complete(Unit); i }, { ii, ex -> released.complete(ii to ex) }) - .parZip( + Resource( + { require(started.complete(Unit)); i }, + { ii, ex -> require(released.complete(ii to ex)) } + ).parZip( Resource({ started.await(); throw throwable }) { _, _ -> } ) { _, _ -> } .use { fail("It should never reach here") } @@ -290,8 +290,10 @@ class ResourceTest : ArrowFxSpec( throw throwable }) { _, _ -> } .parZip( - Resource({ started.complete(Unit); i }, { ii, ex -> released.complete(ii to ex) }) - ) { _, _ -> } + Resource( + { require(started.complete(Unit)); i }, + { ii, ex -> require(released.complete(ii to ex)) } + )) { _, _ -> } .use { fail("It should never reach here") } } shouldBe throwable @@ -307,7 +309,7 @@ class ResourceTest : ArrowFxSpec( val released = CompletableDeferred>() assertThrowable { - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) .parZip( Resource({ }) { _, _ -> throw cancel } ) { _, _ -> } @@ -328,7 +330,7 @@ class ResourceTest : ArrowFxSpec( assertThrowable { Resource({ }) { _, _ -> throw cancel } .parZip( - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) ) { _, _ -> } .use { /*fail("It should never reach here")*/ } }.shouldBeTypeOf() @@ -344,7 +346,7 @@ class ResourceTest : ArrowFxSpec( val released = CompletableDeferred>() assertThrowable { - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) .parZip( Resource({ }) { _, _ -> throw throwable } ) { _, _ -> } @@ -364,7 +366,7 @@ class ResourceTest : ArrowFxSpec( assertThrowable { Resource({ }) { _, _ -> throw throwable } .parZip( - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) ) { _, _ -> } .use { } } shouldBe throwable @@ -381,9 +383,9 @@ class ResourceTest : ArrowFxSpec( val releasedB = CompletableDeferred>() assertThrowable { - Resource({ a }) { aa, ex -> releasedA.complete(aa to ex) } + Resource({ a }) { aa, ex -> require(releasedA.complete(aa to ex)) } .parZip( - Resource({ b }) { bb, ex -> releasedB.complete(bb to ex) } + Resource({ b }) { bb, ex -> require(releasedB.complete(bb to ex)) } ) { _, _ -> } .use { throw throwable } } shouldBe throwable @@ -409,7 +411,7 @@ class ResourceTest : ArrowFxSpec( }) { _, _ -> } .parZip(Resource({ r.set("$b") - modifyGate.complete(0) + require(modifyGate.complete(0)) }) { _, _ -> }) { _a, _b -> _a to _b } .use { r.get() shouldBe "$b$a" From 65798a8a7df4c80b7b1cab686c11ff0e1dfb4c64 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 10 Nov 2021 20:43:50 +0100 Subject: [PATCH 13/13] Remove redundant withNonCancellable --- .../src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index 50bd4f59366..7923f6d94bf 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -707,11 +707,9 @@ private class ResEffectImpl : ResourceEffect { // Only if ExitCase.Failure, or ExitCase.Cancelled during acquire we cancel // Otherwise we've saved the finalizer, and it will be called from somewhere else. if (ex != ExitCase.Completed) { - withContext(NonCancellable) { - val e = finalizers.get().cancelAll(ex) - val e2 = runCatching { release(a, ex) }.exceptionOrNull() - Platform.composeErrors(e, e2)?.let { throw it } - } + val e = finalizers.get().cancelAll(ex) + val e2 = runCatching { release(a, ex) }.exceptionOrNull() + Platform.composeErrors(e, e2)?.let { throw it } } }) is Resource.Bind<*, *> -> {