diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index a661a8a4f68..08b0d907062 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -57,7 +57,7 @@ jobs: linux_jvm_and_js: runs-on: ubuntu-latest - timeout-minutes: 25 + timeout-minutes: 60 steps: - uses: actions/checkout@v2 diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index d601cba41a3..a56209fb408 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -354,7 +354,9 @@ public final class arrow/fx/coroutines/ResourceExtensionsKt { } public final class arrow/fx/coroutines/ResourceKt { + public static final fun release (Larrow/fx/coroutines/Resource;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource; public static final fun release-zgiIeyo (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource; + public static final fun releaseCase (Larrow/fx/coroutines/Resource;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Resource; public static final fun releaseCase-zgiIeyo (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Resource; public static final fun resource (Lkotlin/jvm/functions/Function1;)Lkotlin/jvm/functions/Function1; public static final fun sequence (Ljava/lang/Iterable;)Larrow/fx/coroutines/Resource; @@ -488,3 +490,11 @@ public final class arrow/fx/coroutines/Use { public final synthetic fun unbox-impl ()Lkotlin/jvm/functions/Function1; } +public abstract interface class arrow/fx/coroutines/computations/ResourceEffect { + public abstract fun bind (Larrow/fx/coroutines/Resource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public final class arrow/fx/coroutines/computations/ResourceKt { + public static final fun resource (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Resource; +} + diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index 4b4f6901330..7923f6d94bf 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -1,15 +1,19 @@ package arrow.fx.coroutines +import arrow.continuations.generic.AtomicRef +import arrow.continuations.generic.update +import arrow.core.NonEmptyList +import arrow.core.ValidatedNel import arrow.core.identity -import arrow.core.nonFatalOrThrow +import arrow.core.invalidNel +import arrow.core.traverseValidated +import arrow.core.valid +import arrow.fx.coroutines.computations.ResourceEffect +import kotlin.coroutines.CoroutineContext import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.async -import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.withContext -import kotlin.coroutines.CoroutineContext /** * [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other @@ -164,18 +168,46 @@ public sealed class Resource { * ``` */ @Suppress("UNCHECKED_CAST") - public suspend infix fun use(f: suspend (A) -> B): B = - useLoop(this as Resource, f as suspend (Any?) -> Any?, emptyList()) as B + public tailrec suspend infix fun use(f: suspend (A) -> B): B = + when (this) { + is Dsl -> { + val effect = ResEffectImpl() + val b = try { + val a = dsl(effect) + f(a) + } catch (e: Throwable) { + val ex = if (e is CancellationException) ExitCase.Cancelled(e) else ExitCase.Failure(e) + val ee = withContext(NonCancellable) { + effect.finalizers.get().cancelAll(ex, e) ?: e + } + throw ee + } + withContext(NonCancellable) { + effect.finalizers.get().cancelAll(ExitCase.Completed)?.let { throw it } + } + b + } + is Allocate -> bracketCase(acquire, f, release) + is Bind<*, *> -> Dsl { + val any = source.bind() + val ff = this@Resource.f as (Any?) -> Resource + ff(any).bind() + }.use(f) + is Defer -> resource().use(f) + } public fun map(f: suspend (A) -> B): Resource = - flatMap { a -> Resource({ f(a) }) { _, _ -> } } + arrow.fx.coroutines.computations.resource { f(bind()) } /** Useful for setting up/configuring an acquired resource */ public fun tap(f: suspend (A) -> Unit): Resource = - map { f(it); it } + arrow.fx.coroutines.computations.resource { bind().also { f(it) } } public fun ap(ff: Resource<(A) -> B>): Resource = - flatMap { res -> ff.map { it(res) } } + arrow.fx.coroutines.computations.resource { + val a = bind() + ff.bind()(a) + } /** * Create a resource value of [B] from a resource [A] by mapping [f]. @@ -218,12 +250,13 @@ public sealed class Resource { * @see zip to combine independent resources together * @see parZip for combining independent resources in parallel */ - public fun flatMap(f: (A) -> Resource): Resource = - Bind(this, f) + public fun flatMap(f: (A) -> Resource): Resource = arrow.fx.coroutines.computations.resource { + f(bind()).bind() + } public inline fun zip(other: Resource, crossinline combine: (A, B) -> C): Resource = - flatMap { r -> - other.map { r2 -> combine(r, r2) } + arrow.fx.coroutines.computations.resource { + combine(bind(), other.bind()) } public fun zip(other: Resource): Resource> = @@ -283,8 +316,8 @@ public sealed class Resource { c: Resource, crossinline map: (A, B, C) -> D ): Resource = - zip(b, c, unit, unit, unit, unit, unit, unit, unit) { a, b, c, _, _, _, _, _, _, _ -> - map(a, b, c) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind()) } public inline fun zip( @@ -293,8 +326,8 @@ public sealed class Resource { d: Resource, crossinline map: (A, B, C, D) -> E ): Resource = - zip(b, c, d, unit, unit, unit, unit, unit, unit) { a, b, c, d, e, _, _, _, _, _ -> - map(a, b, c, d) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind()) } public inline fun zip( @@ -304,8 +337,8 @@ public sealed class Resource { e: Resource, crossinline map: (A, B, C, D, E) -> G ): Resource = - zip(b, c, d, e, unit, unit, unit, unit, unit) { a, b, c, d, e, _, _, _, _, _ -> - map(a, b, c, d, e) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind()) } public inline fun zip( @@ -316,8 +349,8 @@ public sealed class Resource { f: Resource, crossinline map: (A, B, C, D, E, F) -> G ): Resource = - zip(b, c, d, e, f, unit, unit, unit, unit) { b, c, d, e, f, g, _, _, _, _ -> - map(b, c, d, e, f, g) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind()) } public inline fun zip( @@ -329,8 +362,8 @@ public sealed class Resource { g: Resource, crossinline map: (A, B, C, D, E, F, G) -> H ): Resource = - zip(b, c, d, e, f, g, unit, unit, unit) { a, b, c, d, e, f, g, _, _, _ -> - map(a, b, c, d, e, f, g) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind()) } public inline fun zip( @@ -343,8 +376,8 @@ public sealed class Resource { h: Resource, crossinline map: (A, B, C, D, E, F, G, H) -> I ): Resource = - zip(b, c, d, e, f, g, h, unit, unit) { a, b, c, d, e, f, g, h, _, _ -> - map(a, b, c, d, e, f, g, h) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind()) } public inline fun zip( @@ -358,8 +391,8 @@ public sealed class Resource { i: Resource, crossinline map: (A, B, C, D, E, F, G, H, I) -> J ): Resource = - zip(b, c, d, e, f, g, h, i, unit) { a, b, c, d, e, f, g, h, i, _ -> - map(a, b, c, d, e, f, g, h, i) + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind(), i.bind()) } public inline fun zip( @@ -374,26 +407,8 @@ public sealed class Resource { j: Resource, crossinline map: (A, B, C, D, E, F, G, H, I, J) -> K ): Resource = - flatMap { aa -> - b.flatMap { bb -> - c.flatMap { cc -> - d.flatMap { dd -> - e.flatMap { ee -> - f.flatMap { ff -> - g.flatMap { gg -> - h.flatMap { hh -> - i.flatMap { ii -> - j.map { jj -> - map(aa, bb, cc, dd, ee, ff, gg, hh, ii, jj) - } - } - } - } - } - } - } - } - } + arrow.fx.coroutines.computations.resource { + map(bind(), b.bind(), c.bind(), d.bind(), e.bind(), f.bind(), g.bind(), h.bind(), i.bind(), j.bind()) } public fun parZip(fb: Resource, f: suspend (A, B) -> C): Resource = @@ -442,44 +457,23 @@ public sealed class Resource { * } * //sampleEnd * ``` - * - * */ public fun parZip( ctx: CoroutineContext = Dispatchers.Default, fb: Resource, f: suspend (A, B) -> C ): Resource = - Resource({ - supervisorScope { - val faa = async(ctx) { allocate() } - val fbb = async(ctx) { fb.allocate() } - val a = awaitOrCancelOther(faa, fbb) - val b = awaitOrCancelOther(fbb, faa) - Pair(a, b) - } - }, { (ar, br), ex -> - val (_, releaseA) = ar - val (_, releaseB) = br - supervisorScope { - val faa = async(ctx) { releaseA(ex) } - val fbb = async(ctx) { releaseB(ex) } - try { - faa.await() - } catch (errorA: Throwable) { - try { - fbb.await() - } catch (errorB: Throwable) { - throw Platform.composeErrors(errorA, errorB) - } - throw errorA - } - fbb.await() - } - }).map { (ar, br) -> - f(ar.first, br.first) + arrow.fx.coroutines.computations.resource { + parZip(ctx, { this@Resource.bind() }, { fb.bind() }) { a, b -> f(a, b) } } + @Deprecated( + "Bind is being deprecated. Use resource DSL instead", + ReplaceWith( + "resource { f(source.bind()) }", + "arrow.fx.coroutines.computations.resource" + ) + ) public class Bind(public val source: Resource, public val f: (A) -> Resource) : Resource() public class Allocate( @@ -487,8 +481,17 @@ public sealed class Resource { public val release: suspend (A, ExitCase) -> Unit ) : Resource() + @Deprecated( + "Defer is being deprecated. Use resource DSL instead", + ReplaceWith( + "resource { resource.invoke().bind() }", + "arrow.fx.coroutines.computations.resource" + ) + ) public class Defer(public val resource: suspend () -> Resource) : Resource() + internal data class Dsl(public val dsl: suspend ResourceEffect.() -> A) : Resource() + public companion object { @PublishedApi @@ -527,36 +530,6 @@ public sealed class Resource { public fun defer(f: suspend () -> Resource): Resource = Resource.Defer(f) } - - private suspend fun continueLoop( - current: Resource, - use: suspend (Any?) -> Any?, - stack: List<(Any?) -> Resource> - ): Any? = useLoop(current, use, stack) - - // Interpreter that knows how to evaluate a Resource data structure - // Maintains its own stack for dealing with Bind chains - @Suppress("UNCHECKED_CAST") - private tailrec suspend fun useLoop( - current: Resource, - use: suspend (Any?) -> Any?, - stack: List<(Any?) -> Resource> - ): Any? = - when (current) { - is Defer -> useLoop(current.resource.invoke(), use, stack) - is Bind<*, *> -> - useLoop(current.source as Resource, use, listOf(current.f as (Any?) -> Resource) + stack) - is Allocate -> bracketCase( - acquire = current.acquire, - use = { a -> - when { - stack.isEmpty() -> use(a) - else -> continueLoop(stack.first()(a), use, stack.drop(1)) - } - }, - release = { a, exitCase -> current.release(a, exitCase) } - ) - } } /** @@ -589,25 +562,53 @@ public sealed class Resource { * } * ``` */ +@Deprecated( + "Use the resource computation DSL instead", + ReplaceWith( + "resource { acquire() }", + "arrow.fx.coroutines.computations.resource" + ) +) public inline class Use(internal val acquire: suspend () -> A) /** * Marks an [acquire] operation as the [Resource.use] step of a [Resource]. */ +@Deprecated( + "Use the resource computation DSL instead", + ReplaceWith( + "resource { acquire() }", + "arrow.fx.coroutines.computations.resource" + ) +) public fun resource(acquire: suspend () -> A): Use = Use(acquire) -/** - * Composes a [release] action to a [Resource.use] action creating a [Resource]. - */ +@Deprecated("Use the resource computation DSL instead") public infix fun Use.release(release: suspend (A) -> Unit): Resource = Resource(acquire) { a, _ -> release(a) } /** - * Composes a [releaseCase] action to a [Resource.use] action creating a [Resource]. + * Composes a [release] action to a [Resource.use] action creating a [Resource]. */ +public infix fun Resource.release(release: suspend (A) -> Unit): Resource = + arrow.fx.coroutines.computations.resource { + val a = bind() + Resource({ a }, { _, _ -> release(a) }).bind() + } + +@Deprecated("Use the resource computation DSL instead") public infix fun Use.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource = Resource(acquire, release) +/** + * Composes a [releaseCase] action to a [Resource.use] action creating a [Resource]. + */ +public infix fun Resource.releaseCase(release: suspend (A, ExitCase) -> Unit): Resource = + arrow.fx.coroutines.computations.resource { + val a = bind() + Resource({ a }, { _, ex -> release(a, ex) }).bind() + } + /** * Traverse this [Iterable] and collects the resulting `Resource` of [f] into a `Resource>`. * @@ -645,8 +646,10 @@ public infix fun Use.releaseCase(release: suspend (A, ExitCase) -> Unit): * ``` */ public inline fun Iterable.traverseResource(crossinline f: (A) -> Resource): Resource> = - fold(Resource.just(emptyList())) { acc: Resource>, a: A -> - f(a).ap(acc.map { { b: B -> it + b } }) + arrow.fx.coroutines.computations.resource { + map { a -> + f(a).bind() + } } /** @@ -690,77 +693,51 @@ public inline fun Iterable.traverseResource(crossinline f: (A) -> Reso public inline fun Iterable>.sequence(): Resource> = traverseResource(::identity) -// Interpreter that knows how to evaluate a Resource data structure -// Maintains its own stack for dealing with Bind chains -@Suppress("UNCHECKED_CAST") -private tailrec suspend fun useLoop( - current: Resource, - stack: List<(Any?) -> Resource> -): Pair Unit> = - when (current) { - is Resource.Defer -> useLoop(current.resource.invoke(), stack) - is Resource.Bind<*, *> -> - useLoop(current.source, listOf(current.f as (Any?) -> Resource) + stack) - is Resource.Allocate -> loadResourceAndReleaseHandler( - acquire = current.acquire, - use = { a -> - when { - stack.isEmpty() -> Pair(a) { ex -> current.release(a, ex) } - else -> useLoop(stack.first()(a), stack.drop(1)) +private class ResEffectImpl : ResourceEffect { + val finalizers: AtomicRef Unit>> = AtomicRef(emptyList()) + override suspend fun Resource.bind(): A = + when (this) { + is Resource.Dsl -> dsl.invoke(this@ResEffectImpl) + is Resource.Allocate -> bracketCase({ + val a = acquire() + val finalizer: suspend (ExitCase) -> Unit = { ex: ExitCase -> release(a, ex) } + finalizers.update { it + finalizer } + a + }, ::identity, { a, ex -> + // Only if ExitCase.Failure, or ExitCase.Cancelled during acquire we cancel + // Otherwise we've saved the finalizer, and it will be called from somewhere else. + if (ex != ExitCase.Completed) { + val e = finalizers.get().cancelAll(ex) + val e2 = runCatching { release(a, ex) }.exceptionOrNull() + Platform.composeErrors(e, e2)?.let { throw it } } - }, - release = { _, _ -> /*a, exitCase -> current.release(a, exitCase)*/ } - ) - } - -private suspend fun Resource.allocate(): Pair Unit> = - useLoop(this, emptyList()) as Pair Unit> - -private suspend inline fun loadResourceAndReleaseHandler( - crossinline acquire: suspend () -> Any?, - crossinline use: suspend (Any?) -> Pair Unit>, - crossinline release: suspend (Any?, ExitCase) -> Unit -): Pair Unit> { - val acquired = withContext(NonCancellable) { - acquire() - } - - return try { // Successfully loaded resource, pass it and its release f down - val (b, _release) = use(acquired) - Pair(b) { ex -> _release(ex); release(acquired, ex) } - } catch (e: CancellationException) { // Release when cancelled - runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) } - } catch (t: Throwable) { // Release when failed to load resource - runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) } - } + }) + is Resource.Bind<*, *> -> { + val dsl: suspend ResourceEffect.() -> A = { + val any = source.bind() + val ff = f as (Any?) -> Resource + ff(any).bind() + } + dsl(this@ResEffectImpl) + } + is Resource.Defer -> resource().bind() + } } -private suspend fun awaitOrCancelOther( - fa: Deferred Unit>>, - fb: Deferred Unit>> -): Pair Unit> = +// Version that doesn't rethrow `CancellationException` because we need to run all finalizers regardless of CancellationException +private inline fun catchNel(f: () -> A): ValidatedNel = try { - fa.await() + f().valid() } catch (e: Throwable) { - if (e is CancellationException) awaitAndAddSuppressed(fb, e, ExitCase.Cancelled(e)) - else awaitAndAddSuppressed(fb, e, ExitCase.Failure(e)) - } - -private suspend fun awaitAndAddSuppressed( - fb: Deferred Unit>>, - e: Throwable, - exitCase: ExitCase -): Nothing { - val cancellationException = try { - if (fb.isCancelled && fb.isCompleted) fb.getCompletionExceptionOrNull() - else fb.await().second.invoke(exitCase).let { null } - } catch (e2: Throwable) { - throw e.apply { addSuppressed(e2) } + e.invalidNel() } - val exception = cancellationException?.let { - e.apply { addSuppressed(it) } - } ?: e - - throw exception -} +private suspend fun List Unit>.cancelAll( + exitCase: ExitCase, + first: Throwable? = null +): Throwable? = traverseValidated { f -> + catchNel { f(exitCase) } +}.fold({ + if (first != null) Platform.composeErrors(NonEmptyList(first, it)) + else Platform.composeErrors(it) +}, { first }) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt new file mode 100644 index 00000000000..0c252bd751d --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/computations/resource.kt @@ -0,0 +1,56 @@ +package arrow.fx.coroutines.computations + +import arrow.fx.coroutines.Resource + +/** + * Computation block for the [Resource] type. + * The [Resource] allows us to describe resources as immutable values, + * and compose them together in simple ways. + * This way you can split the logic of what a `Resource` is and how it should be closed from how you use them. + * + * * # Using and composing Resource + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.computations.resource + * import arrow.fx.coroutines.release + * + * class UserProcessor { + * fun start(): Unit = println("Creating UserProcessor") + * fun shutdown(): Unit = println("Shutting down UserProcessor") + * fun process(ds: DataSource): List = + * ds.users().map { "Processed $it" } + * } + * + * class DataSource { + * fun connect(): Unit = println("Connecting dataSource") + * fun users(): List = listOf("User-1", "User-2", "User-3") + * fun close(): Unit = println("Closed dataSource") + * } + * + * class Service(val db: DataSource, val userProcessor: UserProcessor) { + * suspend fun processData(): List = userProcessor.process(db) + * } + * + * //sampleStart + * val userProcessor = resource { + * UserProcessor().also(UserProcessor::start) + * } release UserProcessor::shutdown + * + * val dataSource = resource { + * DataSource().also { it.connect() } + * } release DataSource::close + * + * suspend fun main(): Unit { + * resource { + * Service(dataSource.bind(), userProcessor.bind()) + * }.use { service -> service.processData() } + * } + * //sampleEnd + * ``` + */ +public interface ResourceEffect { + public suspend fun Resource.bind(): A +} + +public fun resource(f: suspend ResourceEffect.() -> A): Resource = + Resource.Dsl(f) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index 3d42ae4314d..b5d1751bb1c 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -1,14 +1,16 @@ package arrow.fx.coroutines import arrow.core.Either +import arrow.core.identity +import arrow.core.left import io.kotest.assertions.fail import io.kotest.matchers.should import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb +import io.kotest.property.arbitrary.bool import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.list import io.kotest.property.arbitrary.map import io.kotest.property.arbitrary.string import kotlinx.coroutines.CancellationException @@ -29,7 +31,7 @@ class ResourceTest : ArrowFxSpec( "value resource is released with Complete" { checkAll(Arb.int()) { n -> val p = CompletableDeferred() - Resource({ n }, { _, ex -> p.complete(ex) }) + Resource({ n }, { _, ex -> require(p.complete(ex)) }) .use { Unit } p.await() shouldBe ExitCase.Completed @@ -39,7 +41,7 @@ class ResourceTest : ArrowFxSpec( "error resource finishes with error" { checkAll(Arb.throwable()) { e -> val p = CompletableDeferred() - val r = Resource({ throw e }, { _, ex -> p.complete(ex) }) + val r = Resource({ throw e }, { _, ex -> require(p.complete(ex)) }) Either.catch { r.use { it + 1 } @@ -51,11 +53,11 @@ class ResourceTest : ArrowFxSpec( checkAll(Arb.int()) { n -> val p = CompletableDeferred() val start = CompletableDeferred() - val r = Resource({ n }, { _, ex -> p.complete(ex) }) + val r = Resource({ n }, { _, ex -> require(p.complete(ex)) }) val f = async { r.use { - start.complete(Unit) + require(start.complete(Unit)) never() } } @@ -103,9 +105,117 @@ class ResourceTest : ArrowFxSpec( "traverseResource: leftToRight" { checkAll(Arb.list(Arb.int())) { list -> - val mutable = mutableListOf() - list.traverseResource { mutable.add(it); Resource.just(Unit) } - mutable.toList() shouldBe list + list.traverseResource { Resource.just(it) } + .use(::identity) shouldBe list + } + } + + "Resource can close from either" { + val exit = CompletableDeferred() + arrow.core.computations.either { + arrow.fx.coroutines.computations.resource { + Resource({ 1 }) { _, ex -> require(exit.complete(ex)) }.bind() + "error".left().bind() + 1 + }.use { it } + } shouldBe "error".left() + // Should be ExitCase.Cancelled but still Failure due to ShortCircuit + // Cont will fix this issue by properly shifting and cancelling + exit.await().shouldBeTypeOf() + } + + val depth: Int = 100 + + class CheckableAutoClose { + var started = true + fun close() { + started = false + } + } + + fun closeable(): Resource = + Resource({ CheckableAutoClose() }) { a, _ -> a.close() } + + "parZip - success" { + val all = (1..depth).traverseResource { closeable() }.parZip( + (1..depth).traverseResource { closeable() } + ) { a, b -> a + b }.use { all -> + all.also { all.forEach { it.started shouldBe true } } + } + all.forEach { it.started shouldBe false } + } + + fun generate(): Pair>, Resource> { + val promises = (1..depth).map { Pair(it, CompletableDeferred()) } + val res = promises.fold(Resource({ 0 }, { _, _ -> })) { acc, (i, promise) -> + acc.flatMap { ii: Int -> + Resource({ ii + i }) { _, _ -> + require(promise.complete(i)) + } + } + } + return Pair(promises.map { it.second }, res) + } + + "parZip - deep finalizers are called when final one blows" { + io.kotest.property.checkAll(3, Arb.int(10..100)) { + val (promises, resource) = generate() + assertThrowable { + resource.flatMap { + Resource({ throw RuntimeException() }) { _, _ -> } + }.parZip(Resource({ }) { _, _ -> }) { _, _ -> } + .use { fail("It should never reach here") } + }.shouldBeTypeOf() + + (1..depth).zip(promises) { i, promise -> + promise.await() shouldBe i + } + } + } + + "parZip - deep finalizers are called when final one cancels" { + io.kotest.property.checkAll(3, Arb.int(10..100)) { + val cancel = CancellationException(null, null) + val (promises, resource) = generate() + assertThrowable { + resource.flatMap { + Resource({ throw cancel }) { _, _ -> } + }.parZip(Resource({ }) { _, _ -> }) { _, _ -> } + .use { fail("It should never reach here") } + }.shouldBeTypeOf() + + (1..depth).zip(promises) { i, promise -> + promise.await() shouldBe i + } + } + } + + // Test multiple release triggers on acquire fail. + "parZip - Deep finalizers get called on left or right cancellation" { + checkAll(Arb.bool()) { isLeft -> + val cancel = CancellationException(null, null) + val (promises, resource) = generate() + val latch = CompletableDeferred() + assertThrowable { + val res = if (isLeft) Resource({ + latch.await() shouldBe (1..depth).sum() + throw cancel + }) { _, _ -> }.parZip(resource.flatMap { + Resource({ require(latch.complete(it)) }) { _, _ -> } + }) { _, _ -> } + else resource.flatMap { + Resource({ require(latch.complete(it)) }) { _, _ -> } + }.parZip(Resource({ + latch.await() shouldBe (1..depth).sum() + throw cancel + }) { _, _ -> }) { _, _ -> } + + res.use { fail("It should never reach here") } + }.shouldBeTypeOf() + + (1..depth).zip(promises) { i, promise -> + promise.await() shouldBe i + } } } @@ -113,11 +223,11 @@ class ResourceTest : ArrowFxSpec( checkAll(Arb.int()) { i -> val cancel = CancellationException(null, null) val released = CompletableDeferred>() - + val started = CompletableDeferred() assertThrowable { - Resource({ i }, { ii, ex -> - released.complete(ii to ex) - }).parZip(Resource({ throw cancel }) { _, _ -> }) { _, _ -> } + Resource({ require(started.complete(Unit)); i }, { ii, ex -> + require(released.complete(ii to ex)) + }).parZip(Resource({ started.await(); throw cancel }) { _, _ -> }) { _, _ -> } .use { fail("It should never reach here") } }.shouldBeTypeOf() @@ -131,11 +241,15 @@ class ResourceTest : ArrowFxSpec( checkAll(Arb.int()) { i -> val cancel = CancellationException(null, null) val released = CompletableDeferred>() + val started = CompletableDeferred() assertThrowable { - Resource({ throw cancel }) { _, _ -> } - .parZip(Resource({ i }, { ii, ex -> - released.complete(ii to ex) + Resource({ + started.await() + throw cancel + }) { _, _ -> } + .parZip(Resource({ require(started.complete(Unit)); i }, { ii, ex -> + require(released.complete(ii to ex)) })) { _, _ -> } .use { fail("It should never reach here") } }.shouldBeTypeOf() @@ -149,11 +263,13 @@ class ResourceTest : ArrowFxSpec( "parZip - Right error on acquire" { checkAll(Arb.int(), Arb.throwable()) { i, throwable -> val released = CompletableDeferred>() - + val started = CompletableDeferred() assertThrowable { - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) - .parZip( - Resource({ throw throwable }) { _, _ -> } + Resource( + { require(started.complete(Unit)); i }, + { ii, ex -> require(released.complete(ii to ex)) } + ).parZip( + Resource({ started.await(); throw throwable }) { _, _ -> } ) { _, _ -> } .use { fail("It should never reach here") } } shouldBe throwable @@ -167,12 +283,17 @@ class ResourceTest : ArrowFxSpec( "parZip - Left error on acquire" { checkAll(Arb.int(), Arb.throwable()) { i, throwable -> val released = CompletableDeferred>() - + val started = CompletableDeferred() assertThrowable { - Resource({ throw throwable }) { _, _ -> } + Resource({ + started.await() + throw throwable + }) { _, _ -> } .parZip( - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) - ) { _, _ -> } + Resource( + { require(started.complete(Unit)); i }, + { ii, ex -> require(released.complete(ii to ex)) } + )) { _, _ -> } .use { fail("It should never reach here") } } shouldBe throwable @@ -188,7 +309,7 @@ class ResourceTest : ArrowFxSpec( val released = CompletableDeferred>() assertThrowable { - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) .parZip( Resource({ }) { _, _ -> throw cancel } ) { _, _ -> } @@ -209,7 +330,7 @@ class ResourceTest : ArrowFxSpec( assertThrowable { Resource({ }) { _, _ -> throw cancel } .parZip( - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) ) { _, _ -> } .use { /*fail("It should never reach here")*/ } }.shouldBeTypeOf() @@ -225,7 +346,7 @@ class ResourceTest : ArrowFxSpec( val released = CompletableDeferred>() assertThrowable { - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) .parZip( Resource({ }) { _, _ -> throw throwable } ) { _, _ -> } @@ -245,7 +366,7 @@ class ResourceTest : ArrowFxSpec( assertThrowable { Resource({ }) { _, _ -> throw throwable } .parZip( - Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + Resource({ i }, { ii, ex -> require(released.complete(ii to ex)) }) ) { _, _ -> } .use { } } shouldBe throwable @@ -262,9 +383,9 @@ class ResourceTest : ArrowFxSpec( val releasedB = CompletableDeferred>() assertThrowable { - Resource({ a }) { aa, ex -> releasedA.complete(aa to ex) } + Resource({ a }) { aa, ex -> require(releasedA.complete(aa to ex)) } .parZip( - Resource({ b }) { bb, ex -> releasedB.complete(bb to ex) } + Resource({ b }) { bb, ex -> require(releasedB.complete(bb to ex)) } ) { _, _ -> } .use { throw throwable } } shouldBe throwable @@ -290,7 +411,7 @@ class ResourceTest : ArrowFxSpec( }) { _, _ -> } .parZip(Resource({ r.set("$b") - modifyGate.complete(0) + require(modifyGate.complete(0)) }) { _, _ -> }) { _a, _b -> _a to _b } .use { r.get() shouldBe "$b$a"