From f6f5af83216a9da84b3528150b95fb10a65e34d5 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Sun, 27 Feb 2022 02:28:06 +0100 Subject: [PATCH 01/19] add recursAndCollect --- .../api/arrow-fx-coroutines.api | 24 +++++++++++++++++++ .../kotlin/arrow/fx/coroutines/Schedule.kt | 11 +++++++-- .../arrow/fx/coroutines/ScheduleTest.kt | 12 +++++++++- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index a56209fb408..2244462b45d 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -432,6 +432,7 @@ public final class arrow/fx/coroutines/Schedule$Companion { public final fun never ()Larrow/fx/coroutines/Schedule; public final fun once ()Larrow/fx/coroutines/Schedule; public final fun recurs (I)Larrow/fx/coroutines/Schedule; + public final fun recursAndCollect (I)Larrow/fx/coroutines/Schedule; public final fun spaced (D)Larrow/fx/coroutines/Schedule; public final fun spaced-LRDsOJo (J)Larrow/fx/coroutines/Schedule; public final fun unfold (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Schedule; @@ -471,6 +472,29 @@ public final class arrow/fx/coroutines/Schedule$Decision$Companion { public final fun done-KLykuaI (JLjava/lang/Object;Larrow/core/Eval;)Larrow/fx/coroutines/Schedule$Decision; } +public final class arrow/fx/coroutines/Schedule$ScheduleImpl : arrow/fx/coroutines/Schedule { + public fun (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)V + public fun andThen (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; + public fun check (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; + public fun choose (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; + public fun combineNanos (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; + public fun contramap (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; + public fun foldLazy (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; + public fun forever ()Larrow/fx/coroutines/Schedule; + public final fun getUpdate ()Lkotlin/jvm/functions/Function3; + public fun logInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; + public fun logOutput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; + public fun map (Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Schedule; + public fun modifyNanos (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; + public fun not ()Larrow/fx/coroutines/Schedule; + public final fun onDecision (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; + public fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; + public final fun reconsider (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; + public fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun updated (Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Schedule; + public fun zip (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; +} + public final class arrow/fx/coroutines/ScheduleKt { public static final fun retry (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun retryOrElse (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index ca7bca1aa13..0400a8c2a11 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -426,9 +426,10 @@ public sealed class Schedule { (other pipe this) // Dependent type emulation + @PublishedApi @Suppress("UNCHECKED_CAST") internal class ScheduleImpl( - val initialState: suspend () -> State, + internal val initialState: suspend () -> State, val update: suspend (a: Input, s: State) -> Decision ) : Schedule() { @@ -736,7 +737,7 @@ public sealed class Schedule { unfold(0) { it + 1 } /** - * Creates a Schedule that continues n times and returns the number of iterations. + * Creates a Schedule that continues [n] times and returns the number of iterations. */ public fun recurs(n: Int): Schedule = Schedule(suspend { 0 }) { _: A, acc -> @@ -744,6 +745,12 @@ public sealed class Schedule { else Decision.done(0.0, acc, Eval.now(acc)) } + /** + * Creates a Schedule that continues [n] times and collects the output along the way. + */ + public fun recursAndCollect(n: Int): Schedule> = + recurs(n).zipRight(identity().collect()) + /** * Creates a Schedule that only retries once. */ diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 87c865eac9c..256d9efa955 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -7,9 +7,9 @@ import io.kotest.matchers.should import io.kotest.matchers.shouldBe import kotlinx.coroutines.withTimeoutOrNull import kotlin.math.pow +import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.ExperimentalTime import kotlin.time.milliseconds -import kotlin.time.nanoseconds import kotlin.time.seconds @ExperimentalTime @@ -62,6 +62,16 @@ class ScheduleTest : ArrowFxSpec( res.last() eqv Schedule.Decision(false, 0.0, n + 1, Eval.now(n + 1)) } + "Schedule.recursAndCollect(n: Int)" { + val n = 500 + val res = Schedule.recursAndCollect(n).calculateSchedule(0, n + 1) + + res.dropLast(1).map { it.delayInNanos.nanoseconds } shouldBe res.dropLast(1).map { 0.nanoseconds } + res.dropLast(1).map { it.cont } shouldBe res.dropLast(1).map { true } + + res.last() eqv Schedule.Decision(false, 0.0, n + 1, Eval.now(n + 1)) + } + "Schedule.once() repeats 1 additional time" { var count = 0 Schedule.once().repeat { From b52345e3dfca29899c891f68e730de4211eefbf2 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Sun, 27 Feb 2022 02:31:41 +0100 Subject: [PATCH 02/19] cleanUp --- .../api/arrow-fx-coroutines.api | 23 ------------------- .../kotlin/arrow/fx/coroutines/Schedule.kt | 4 ++-- 2 files changed, 2 insertions(+), 25 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index 2244462b45d..b71bf87898a 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -472,29 +472,6 @@ public final class arrow/fx/coroutines/Schedule$Decision$Companion { public final fun done-KLykuaI (JLjava/lang/Object;Larrow/core/Eval;)Larrow/fx/coroutines/Schedule$Decision; } -public final class arrow/fx/coroutines/Schedule$ScheduleImpl : arrow/fx/coroutines/Schedule { - public fun (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)V - public fun andThen (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; - public fun check (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; - public fun choose (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; - public fun combineNanos (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; - public fun contramap (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; - public fun foldLazy (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; - public fun forever ()Larrow/fx/coroutines/Schedule; - public final fun getUpdate ()Lkotlin/jvm/functions/Function3; - public fun logInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; - public fun logOutput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; - public fun map (Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Schedule; - public fun modifyNanos (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; - public fun not ()Larrow/fx/coroutines/Schedule; - public final fun onDecision (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; - public fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; - public final fun reconsider (Lkotlin/jvm/functions/Function3;)Larrow/fx/coroutines/Schedule; - public fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public final fun updated (Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Schedule; - public fun zip (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; -} - public final class arrow/fx/coroutines/ScheduleKt { public static final fun retry (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun retryOrElse (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 0400a8c2a11..92cfe3dabbb 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -426,10 +426,9 @@ public sealed class Schedule { (other pipe this) // Dependent type emulation - @PublishedApi @Suppress("UNCHECKED_CAST") internal class ScheduleImpl( - internal val initialState: suspend () -> State, + val initialState: suspend () -> State, val update: suspend (a: Input, s: State) -> Decision ) : Schedule() { @@ -639,6 +638,7 @@ public sealed class Schedule { public fun map(g: (B) -> D): Decision = bimap(::identity, g) + public fun combineNanos( other: Decision, f: (Boolean, Boolean) -> Boolean, From 172df3aa35e24b7fa00120f2dc040dca1f3242ef Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Sun, 27 Feb 2022 10:28:20 +0100 Subject: [PATCH 03/19] simplify impl --- .../kotlin/arrow/fx/coroutines/Schedule.kt | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 92cfe3dabbb..3499fc8bf02 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -621,7 +621,12 @@ public sealed class Schedule { /** * A single decision. Contains the decision to continue, the delay, the new state and the (lazy) result of a Schedule. */ - public data class Decision(val cont: Boolean, val delayInNanos: Double, val state: A, val finish: Eval) { + public data class Decision( + val cont: Boolean, + val delayInNanos: Double, + val state: A, + val finish: Eval + ) { @ExperimentalTime val duration: Duration @@ -749,7 +754,10 @@ public sealed class Schedule { * Creates a Schedule that continues [n] times and collects the output along the way. */ public fun recursAndCollect(n: Int): Schedule> = - recurs(n).zipRight(identity().collect()) + Schedule({ emptyList() }) { a: A, acc -> + if (acc.size < n) Decision.cont(0.0, acc + a, Eval.now(acc + a)) + else Decision.done(0.0, acc, Eval.now(acc)) + } /** * Creates a Schedule that only retries once. From 83f80c4718cd81a5cdc5735089799418fbfb272c Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Mon, 28 Feb 2022 10:44:46 +0100 Subject: [PATCH 04/19] Schedule -> Flow --- .../kotlin/arrow/fx/coroutines/Schedule.kt | 68 ++++++++++++++++++- .../arrow/fx/coroutines/ScheduleTest.kt | 66 ++++++++++++++++++ 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 3499fc8bf02..61c55540333 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -19,6 +19,10 @@ import kotlin.random.Random import kotlin.time.Duration import kotlin.time.ExperimentalTime import kotlin.time.nanoseconds +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.retry /** * # Retrying and repeating effects @@ -217,6 +221,30 @@ public sealed class Schedule { public suspend fun repeatOrElse(fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> Output): Output = repeatOrElseEither(fa, orElse).fold(::identity, ::identity) + /** + * transforms this effect + * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. + * Also offers a function to handle errors if they are encountered during repetition. + */ + public abstract suspend fun repeatAsFlow( + fa: suspend () -> Input, + orElse: suspend (Throwable) -> C + ): Flow> + + /** + * Runs this effect once and, if it succeeded, decide using the provided policy if the effect should be repeated and if so, with how much delay. + * Returns the last output from the policy or raises an error if a repeat failed. + */ + public suspend fun repeatAsFlow(fa: suspend () -> Input): Flow = + repeatAsFlowOrElse(fa) { e -> throw e } + + /** + * Runs this effect once and, if it succeeded, decide using the provided policy if the effect should be repeated and if so, with how much delay. + * Also offers a function to handle errors if they are encountered during repetition. + */ + public suspend fun repeatAsFlowOrElse(fa: suspend () -> Input, orElse: suspend (Throwable) -> Output): Flow = + repeatAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } + /** * Changes the output of a schedule. Does not alter the decision of the schedule. */ @@ -292,7 +320,10 @@ public sealed class Schedule { /** * Accumulates the results of a schedule by folding over them effectfully. */ - public abstract fun foldLazy(initial: suspend () -> C, f: suspend (acc: C, output: Output) -> C): Schedule + public abstract fun foldLazy( + initial: suspend () -> C, + f: suspend (acc: C, output: Output) -> C + ): Schedule /** * Composes this schedule with the other schedule by piping the output of this schedule @@ -458,6 +489,36 @@ public sealed class Schedule { } } + override suspend fun repeatAsFlow( + fa: suspend () -> Input, + orElse: suspend (Throwable) -> C + ): Flow> = + flow { + var loop = true + var state: State = initialState.invoke() + + while (loop) { + coroutineContext.ensureActive() + try { + val a = fa.invoke() + val step = update(a, state) + if (!step.cont) { + emit(Either.Right(step.finish.value())) + loop = false + } else { + delay((step.delayInNanos / 1_000_000).toLong()) + + // Set state before looping again and emit Output + emit(Either.Right(step.finish.value())) + state = step.state + } + } catch (e: Throwable) { + emit(Either.Left(orElse(e.nonFatalOrThrow()))) + loop = false + } + } + } + override fun map(f: (output: Output) -> B): Schedule = ScheduleImpl(initialState) { i, s -> update(i, s).map(f) } @@ -955,7 +1016,10 @@ public suspend fun Schedule.retry(fa: suspend () -> A): A = * Runs an effect and, if it fails, decide using the provided policy if the effect should be retried and if so, with how much delay. * Also offers a function to handle errors if they are encountered during retrial. */ -public suspend fun Schedule.retryOrElse(fa: suspend () -> A, orElse: suspend (Throwable, B) -> A): A = +public suspend fun Schedule.retryOrElse( + fa: suspend () -> A, + orElse: suspend (Throwable, B) -> A +): A = retryOrElseEither(fa, orElse).fold(::identity, ::identity) /** diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 256d9efa955..187958e6197 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -2,6 +2,7 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.Eval +import arrow.core.identity import io.kotest.assertions.fail import io.kotest.matchers.should import io.kotest.matchers.shouldBe @@ -11,6 +12,12 @@ import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.ExperimentalTime import kotlin.time.milliseconds import kotlin.time.seconds +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.flow.zip @ExperimentalTime class ScheduleTest : ArrowFxSpec( @@ -52,6 +59,23 @@ class ScheduleTest : ArrowFxSpec( checkRepeat(Schedule.recurs(1), expected = 1) } + "Schedule.recursAndCollect(negative number)" { + checkRepeat(Schedule.recursAndCollect(-500), expected = emptyList()) + } + + "Schedule.recursAndCollect(0)" { + checkRepeat(Schedule.recursAndCollect(0), expected = emptyList()) + } + + "Schedule.recursAndCollect(n : Int)" { + val n = 500 + val res = Schedule.recursAndCollect(n).calculateSchedule(0, n + 1) + res.dropLast(1).map { it.delayInNanos.nanoseconds } shouldBe res.dropLast(1).map { 0.nanoseconds } + res.dropLast(1).map { it.cont } shouldBe res.dropLast(1).map { true } + + res.last() eqv Schedule.Decision(false, 0.0, n + 1, Eval.now((1..n).toList())) + } + "Schedule.recurs(n: Int)" { val n = 500 val res = Schedule.recurs(n).calculateSchedule(0, n + 1) @@ -183,6 +207,14 @@ class ScheduleTest : ArrowFxSpec( checkRepeat(Schedule.recurs(20_000), expected = 20_000) } + "repeat is stack-safe with recurseAndCollect" { + checkRepeat(Schedule.recursAndCollect(20_000), expected = (1..20_000).toList()) + } + + "repeatAsFlow is stack-safe" { + checkRepeatAsFlow(Schedule.recurs(500_000), expected = (1..500_000).asFlow()) + } + "repeat" { val stop = RuntimeException("WOOO") val dec = Schedule.Decision(true, 10.0, 0, Eval.now("state")) @@ -202,12 +234,38 @@ class ScheduleTest : ArrowFxSpec( l shouldBe Either.Left(stop) } + /* "repeatAsFlow is stack-safe" { + val count = Atomic(0) + val n = 2 + val s = Schedule.once() + .repeatAsFlow { println("KI") }.collect { i -> + println("end$i") + } + + val l = Either.catch { + Schedule.recurs(20_000).retry { + count.updateAndGet { it + 1 } + throw exception + } + } + + l should leftException(exception) + count.get() shouldBe 20_001 + }*/ + + "repeat fails fast on errors" { val ex = Throwable("Hello") Schedule.recurs(0).repeatOrElseEither({ throw ex }) { exc, _ -> exc } .fold({ it shouldBe ex }, { fail("The impossible happened") }) } + "repeatAsFlow fails fast on errors" { + val ex = Throwable("Hello") + Schedule.recurs(0).repeatAsFlow({ throw ex }, ::identity) + .collect { either -> either.fold({ it shouldBe ex }, { fail("The impossible happened") }) } + } + "repeat should run the schedule with the correct input" { var i = 0 (Schedule.recurs(10).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..10).toList() @@ -301,6 +359,14 @@ private suspend fun checkRepeat(schedule: Schedule, expected: B): Un } shouldBe expected } +private suspend fun checkRepeatAsFlow(schedule: Schedule, expected: Flow): Unit { + val count = Atomic(0) + schedule.repeatAsFlow { + count.updateAndGet { it + 1 } + }.zip(expected, ::Pair) + .collect { (a, b) -> a shouldBe b } +} + @ExperimentalTime private infix fun Schedule.Decision.eqv(other: Schedule.Decision): Unit { require(cont == other.cont) { "Decision#cont: ${this.cont} shouldBe ${other.cont}" } From 779c2ce48ace6973b30e7d39c10eef54d770460b Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Mon, 28 Feb 2022 10:47:18 +0100 Subject: [PATCH 05/19] rm recurseAndCollect --- .../arrow/fx/coroutines/ScheduleTest.kt | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 187958e6197..ab36384454f 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -59,23 +59,6 @@ class ScheduleTest : ArrowFxSpec( checkRepeat(Schedule.recurs(1), expected = 1) } - "Schedule.recursAndCollect(negative number)" { - checkRepeat(Schedule.recursAndCollect(-500), expected = emptyList()) - } - - "Schedule.recursAndCollect(0)" { - checkRepeat(Schedule.recursAndCollect(0), expected = emptyList()) - } - - "Schedule.recursAndCollect(n : Int)" { - val n = 500 - val res = Schedule.recursAndCollect(n).calculateSchedule(0, n + 1) - res.dropLast(1).map { it.delayInNanos.nanoseconds } shouldBe res.dropLast(1).map { 0.nanoseconds } - res.dropLast(1).map { it.cont } shouldBe res.dropLast(1).map { true } - - res.last() eqv Schedule.Decision(false, 0.0, n + 1, Eval.now((1..n).toList())) - } - "Schedule.recurs(n: Int)" { val n = 500 val res = Schedule.recurs(n).calculateSchedule(0, n + 1) @@ -86,16 +69,6 @@ class ScheduleTest : ArrowFxSpec( res.last() eqv Schedule.Decision(false, 0.0, n + 1, Eval.now(n + 1)) } - "Schedule.recursAndCollect(n: Int)" { - val n = 500 - val res = Schedule.recursAndCollect(n).calculateSchedule(0, n + 1) - - res.dropLast(1).map { it.delayInNanos.nanoseconds } shouldBe res.dropLast(1).map { 0.nanoseconds } - res.dropLast(1).map { it.cont } shouldBe res.dropLast(1).map { true } - - res.last() eqv Schedule.Decision(false, 0.0, n + 1, Eval.now(n + 1)) - } - "Schedule.once() repeats 1 additional time" { var count = 0 Schedule.once().repeat { @@ -207,10 +180,6 @@ class ScheduleTest : ArrowFxSpec( checkRepeat(Schedule.recurs(20_000), expected = 20_000) } - "repeat is stack-safe with recurseAndCollect" { - checkRepeat(Schedule.recursAndCollect(20_000), expected = (1..20_000).toList()) - } - "repeatAsFlow is stack-safe" { checkRepeatAsFlow(Schedule.recurs(500_000), expected = (1..500_000).asFlow()) } From ad6053d41399ed0e8429df2c238fc67d01382d15 Mon Sep 17 00:00:00 2001 From: i-walker Date: Mon, 28 Feb 2022 09:48:08 +0000 Subject: [PATCH 06/19] Update API files --- arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api | 3 +++ 1 file changed, 3 insertions(+) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index b71bf87898a..cce1c74542b 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -394,6 +394,9 @@ public abstract class arrow/fx/coroutines/Schedule { public final fun or (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public abstract fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public final fun repeat (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlowOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun untilInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; From a2a7558a9c662336efe2050ae066702bb43fd983 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Tue, 1 Mar 2022 13:13:46 +0100 Subject: [PATCH 07/19] rm recurseAndCollect and add Validated version --- .../api/arrow-fx-coroutines.api | 7 +- .../kotlin/arrow/fx/coroutines/Schedule.kt | 65 +++++++++++++++---- 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index b71bf87898a..8be8ae64980 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -394,6 +394,12 @@ public abstract class arrow/fx/coroutines/Schedule { public final fun or (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public abstract fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public final fun repeat (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlowOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlowValidated (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun repeatAsFlowValidated (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlowValidatedOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun untilInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; @@ -432,7 +438,6 @@ public final class arrow/fx/coroutines/Schedule$Companion { public final fun never ()Larrow/fx/coroutines/Schedule; public final fun once ()Larrow/fx/coroutines/Schedule; public final fun recurs (I)Larrow/fx/coroutines/Schedule; - public final fun recursAndCollect (I)Larrow/fx/coroutines/Schedule; public final fun spaced (D)Larrow/fx/coroutines/Schedule; public final fun spaced-LRDsOJo (J)Larrow/fx/coroutines/Schedule; public final fun unfold (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Schedule; diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 61c55540333..46f21b1489e 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -2,6 +2,7 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.Eval +import arrow.core.Validated import arrow.core.identity import arrow.core.left import arrow.core.nonFatalOrThrow @@ -245,6 +246,24 @@ public sealed class Schedule { public suspend fun repeatAsFlowOrElse(fa: suspend () -> Input, orElse: suspend (Throwable) -> Output): Flow = repeatAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } + public abstract suspend fun repeatAsFlowValidated( + fa: suspend () -> Input, + orElse: suspend (Throwable) -> C + ): Flow> + + /** + * similar to [repeatAsFlow], but accumulates errors along the way and continues the provided policy until the effect should not be repeated + */ + public suspend fun repeatAsFlowValidated(fa: suspend () -> Input): Flow = + repeatAsFlowValidatedOrElse(fa) { e -> throw e } + + public suspend fun repeatAsFlowValidatedOrElse( + fa: suspend () -> Input, + orElse: suspend (Throwable) -> Output + ): Flow = + repeatAsFlowValidated(fa, orElse) + .map { it.fold(::identity, ::identity) } + /** * Changes the output of a schedule. Does not alter the decision of the schedule. */ @@ -494,17 +513,17 @@ public sealed class Schedule { orElse: suspend (Throwable) -> C ): Flow> = flow { - var loop = true + val loop = Atomic(true) var state: State = initialState.invoke() - while (loop) { + while (loop.get()) { coroutineContext.ensureActive() try { val a = fa.invoke() val step = update(a, state) if (!step.cont) { emit(Either.Right(step.finish.value())) - loop = false + loop.set(false) } else { delay((step.delayInNanos / 1_000_000).toLong()) @@ -514,7 +533,36 @@ public sealed class Schedule { } } catch (e: Throwable) { emit(Either.Left(orElse(e.nonFatalOrThrow()))) - loop = false + loop.set(false) + } + } + } + + override suspend fun repeatAsFlowValidated( + fa: suspend () -> Input, + orElse: suspend (Throwable) -> C + ): Flow> = + flow { + val loop = Atomic(true) + var state: State = initialState.invoke() + + while (loop.get()) { + coroutineContext.ensureActive() + try { + val a = fa.invoke() + val step = update(a, state) + if (!step.cont) { + emit(Validated.Valid(step.finish.value())) + loop.set(false) + } else { + delay((step.delayInNanos / 1_000_000).toLong()) + + // Set state before looping again and emit Output + emit(Validated.Valid(step.finish.value())) + state = step.state + } + } catch (e: Throwable) { + emit(Validated.Invalid(orElse(e.nonFatalOrThrow()))) } } } @@ -811,15 +859,6 @@ public sealed class Schedule { else Decision.done(0.0, acc, Eval.now(acc)) } - /** - * Creates a Schedule that continues [n] times and collects the output along the way. - */ - public fun recursAndCollect(n: Int): Schedule> = - Schedule({ emptyList() }) { a: A, acc -> - if (acc.size < n) Decision.cont(0.0, acc + a, Eval.now(acc + a)) - else Decision.done(0.0, acc, Eval.now(acc)) - } - /** * Creates a Schedule that only retries once. */ From 89b98600361d3c90d40a024c259cf8ae044657d9 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Tue, 1 Mar 2022 16:20:19 +0100 Subject: [PATCH 08/19] rm repeatAsFlowValidated --- .../kotlin/arrow/fx/coroutines/Schedule.kt | 47 ------------------- 1 file changed, 47 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 46f21b1489e..5cbab19f08f 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -246,24 +246,6 @@ public sealed class Schedule { public suspend fun repeatAsFlowOrElse(fa: suspend () -> Input, orElse: suspend (Throwable) -> Output): Flow = repeatAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } - public abstract suspend fun repeatAsFlowValidated( - fa: suspend () -> Input, - orElse: suspend (Throwable) -> C - ): Flow> - - /** - * similar to [repeatAsFlow], but accumulates errors along the way and continues the provided policy until the effect should not be repeated - */ - public suspend fun repeatAsFlowValidated(fa: suspend () -> Input): Flow = - repeatAsFlowValidatedOrElse(fa) { e -> throw e } - - public suspend fun repeatAsFlowValidatedOrElse( - fa: suspend () -> Input, - orElse: suspend (Throwable) -> Output - ): Flow = - repeatAsFlowValidated(fa, orElse) - .map { it.fold(::identity, ::identity) } - /** * Changes the output of a schedule. Does not alter the decision of the schedule. */ @@ -538,35 +520,6 @@ public sealed class Schedule { } } - override suspend fun repeatAsFlowValidated( - fa: suspend () -> Input, - orElse: suspend (Throwable) -> C - ): Flow> = - flow { - val loop = Atomic(true) - var state: State = initialState.invoke() - - while (loop.get()) { - coroutineContext.ensureActive() - try { - val a = fa.invoke() - val step = update(a, state) - if (!step.cont) { - emit(Validated.Valid(step.finish.value())) - loop.set(false) - } else { - delay((step.delayInNanos / 1_000_000).toLong()) - - // Set state before looping again and emit Output - emit(Validated.Valid(step.finish.value())) - state = step.state - } - } catch (e: Throwable) { - emit(Validated.Invalid(orElse(e.nonFatalOrThrow()))) - } - } - } - override fun map(f: (output: Output) -> B): Schedule = ScheduleImpl(initialState) { i, s -> update(i, s).map(f) } From 43b4a58ec5fc64fc8c9d1f822dbc45211bee3718 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Tue, 1 Mar 2022 16:22:18 +0100 Subject: [PATCH 09/19] change api files --- arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api | 3 --- 1 file changed, 3 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index da4d58953f0..bcd286f5af6 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -397,9 +397,6 @@ public abstract class arrow/fx/coroutines/Schedule { public final fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatAsFlowOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public final fun repeatAsFlowValidated (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun repeatAsFlowValidated (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public final fun repeatAsFlowValidatedOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun untilInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; From dd3b1bbc5636b227ed1787d7b21acdd3abca1ff0 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Tue, 1 Mar 2022 16:35:48 +0100 Subject: [PATCH 10/19] rm docs --- .../commonMain/kotlin/arrow/fx/coroutines/Schedule.kt | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 5cbab19f08f..105762e4189 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -232,17 +232,11 @@ public sealed class Schedule { orElse: suspend (Throwable) -> C ): Flow> - /** - * Runs this effect once and, if it succeeded, decide using the provided policy if the effect should be repeated and if so, with how much delay. - * Returns the last output from the policy or raises an error if a repeat failed. - */ + public suspend fun repeatAsFlow(fa: suspend () -> Input): Flow = repeatAsFlowOrElse(fa) { e -> throw e } - /** - * Runs this effect once and, if it succeeded, decide using the provided policy if the effect should be repeated and if so, with how much delay. - * Also offers a function to handle errors if they are encountered during repetition. - */ + public suspend fun repeatAsFlowOrElse(fa: suspend () -> Input, orElse: suspend (Throwable) -> Output): Flow = repeatAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } From d209eed8a144a89fd7d5cbc0132f06328d60f598 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Tue, 1 Mar 2022 17:31:34 +0100 Subject: [PATCH 11/19] add repeatAsFlow tests and Docs --- .../kotlin/arrow/fx/coroutines/Schedule.kt | 16 ++++---- .../arrow/fx/coroutines/ScheduleTest.kt | 40 ++++++++++--------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 105762e4189..d3ff3d6d457 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -2,7 +2,6 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.Eval -import arrow.core.Validated import arrow.core.identity import arrow.core.left import arrow.core.nonFatalOrThrow @@ -222,21 +221,22 @@ public sealed class Schedule { public suspend fun repeatOrElse(fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> Output): Output = repeatOrElseEither(fa, orElse).fold(::identity, ::identity) - /** - * transforms this effect - * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. - * Also offers a function to handle errors if they are encountered during repetition. - */ public abstract suspend fun repeatAsFlow( fa: suspend () -> Input, orElse: suspend (Throwable) -> C ): Flow> - + /** + * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. + * This will raise an error if a repeat failed. + */ public suspend fun repeatAsFlow(fa: suspend () -> Input): Flow = repeatAsFlowOrElse(fa) { e -> throw e } - + /** + * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. + * Also offers a function to handle errors if they are encountered during repetition. + */ public suspend fun repeatAsFlowOrElse(fa: suspend () -> Input, orElse: suspend (Throwable) -> Output): Flow = repeatAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index ab36384454f..672e95195dc 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -203,25 +203,24 @@ class ScheduleTest : ArrowFxSpec( l shouldBe Either.Left(stop) } - /* "repeatAsFlow is stack-safe" { - val count = Atomic(0) - val n = 2 - val s = Schedule.once() - .repeatAsFlow { println("KI") }.collect { i -> - println("end$i") - } - - val l = Either.catch { - Schedule.recurs(20_000).retry { - count.updateAndGet { it + 1 } - throw exception - } - } - - l should leftException(exception) - count.get() shouldBe 20_001 - }*/ + "repeatAsFlow" { + val stop = RuntimeException("WOOO") + val dec = Schedule.Decision(true, 10.0, 0, Eval.now("state")) + val n = 100 + val schedule = Schedule({ 0 }) { _: Unit, _ -> dec } + + val eff = SideEffect() + + val l = Either.catch { + schedule.repeatAsFlow { + if (eff.counter >= n) throw stop + else eff.increment() + } + } + eff.counter shouldBe 100 + l shouldBe Either.Left(stop) + } "repeat fails fast on errors" { val ex = Throwable("Hello") @@ -240,6 +239,11 @@ class ScheduleTest : ArrowFxSpec( (Schedule.recurs(10).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..10).toList() } + "repeatAsFlow should run the schedule with the correct input" { + var i = 0 + Schedule.recurs(10).repeatAsFlow { i++ } shouldBe (0..10).asFlow() + } + "retry is stack-safe" { val count = Atomic(0) val l = Either.catch { From 4ce8055922e2f4f7b91e67f021b133df2972fae0 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Tue, 1 Mar 2022 17:33:59 +0100 Subject: [PATCH 12/19] clean up --- .../src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 672e95195dc..2f42317ad45 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -14,9 +14,6 @@ import kotlin.time.milliseconds import kotlin.time.seconds import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.zip @ExperimentalTime From 70d61565382693a335bac84b8a3f78a85bfc8e7a Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Wed, 2 Mar 2022 09:46:51 +0100 Subject: [PATCH 13/19] fix tests --- .../kotlin/arrow/fx/coroutines/ScheduleTest.kt | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 2f42317ad45..1bd76015d1f 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -14,6 +14,8 @@ import kotlin.time.milliseconds import kotlin.time.seconds import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.zip @ExperimentalTime @@ -212,7 +214,7 @@ class ScheduleTest : ArrowFxSpec( schedule.repeatAsFlow { if (eff.counter >= n) throw stop else eff.increment() - } + }.collect() } eff.counter shouldBe 100 @@ -233,12 +235,15 @@ class ScheduleTest : ArrowFxSpec( "repeat should run the schedule with the correct input" { var i = 0 - (Schedule.recurs(10).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..10).toList() + val n = 10 + (Schedule.recurs(n).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..n).toList() } "repeatAsFlow should run the schedule with the correct input" { var i = 0 - Schedule.recurs(10).repeatAsFlow { i++ } shouldBe (0..10).asFlow() + val n = 10 + (Schedule.recurs(n).zipRight(Schedule.collect())).repeatAsFlow { i++ }.toList() shouldBe + (0..n).map { (0..it).toList() } } "retry is stack-safe" { From e2e90a6e71c8cb19b971e390e6f604289352e4a4 Mon Sep 17 00:00:00 2001 From: Imran Malic Settuba <46971368+i-walker@users.noreply.github.com> Date: Wed, 2 Mar 2022 12:30:38 +0100 Subject: [PATCH 14/19] Use Boolean instead of Atomic --- .../src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index d3ff3d6d457..07e9ed199a6 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -489,17 +489,17 @@ public sealed class Schedule { orElse: suspend (Throwable) -> C ): Flow> = flow { - val loop = Atomic(true) + var loop = true var state: State = initialState.invoke() - while (loop.get()) { + while (loop) { coroutineContext.ensureActive() try { val a = fa.invoke() val step = update(a, state) if (!step.cont) { emit(Either.Right(step.finish.value())) - loop.set(false) + loop = false } else { delay((step.delayInNanos / 1_000_000).toLong()) @@ -509,7 +509,7 @@ public sealed class Schedule { } } catch (e: Throwable) { emit(Either.Left(orElse(e.nonFatalOrThrow()))) - loop.set(false) + loop = false } } } From bf31b5d66ea959486d311d856b72d1d676f59504 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Wed, 2 Mar 2022 13:56:20 +0100 Subject: [PATCH 15/19] add last output in `orElse` --- .../kotlin/arrow/fx/coroutines/PredefTest.kt | 1 + .../kotlin/arrow/fx/coroutines/Schedule.kt | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt b/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt index b6eda368a9f..6ede23a21cc 100644 --- a/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt @@ -1,5 +1,6 @@ package arrow.fx.coroutines +import arrow.core.Either import io.kotest.matchers.shouldBe import io.kotest.property.Arb import io.kotest.property.arbitrary.int diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 07e9ed199a6..4aca963a8d1 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -223,7 +223,7 @@ public sealed class Schedule { public abstract suspend fun repeatAsFlow( fa: suspend () -> Input, - orElse: suspend (Throwable) -> C + orElse: suspend (Throwable, Output?) -> C ): Flow> /** @@ -231,13 +231,16 @@ public sealed class Schedule { * This will raise an error if a repeat failed. */ public suspend fun repeatAsFlow(fa: suspend () -> Input): Flow = - repeatAsFlowOrElse(fa) { e -> throw e } + repeatAsFlowOrElse(fa) { e, _ -> throw e } /** * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. * Also offers a function to handle errors if they are encountered during repetition. */ - public suspend fun repeatAsFlowOrElse(fa: suspend () -> Input, orElse: suspend (Throwable) -> Output): Flow = + public suspend fun repeatAsFlowOrElse( + fa: suspend () -> Input, + orElse: suspend (Throwable, Output?) -> Output + ): Flow = repeatAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } /** @@ -486,10 +489,11 @@ public sealed class Schedule { override suspend fun repeatAsFlow( fa: suspend () -> Input, - orElse: suspend (Throwable) -> C + orElse: suspend (Throwable, Output?) -> C ): Flow> = flow { var loop = true + var last: (() -> Output)? = null // We haven't seen any input yet var state: State = initialState.invoke() while (loop) { @@ -502,13 +506,14 @@ public sealed class Schedule { loop = false } else { delay((step.delayInNanos / 1_000_000).toLong()) - + val output = step.finish.value() // Set state before looping again and emit Output - emit(Either.Right(step.finish.value())) + emit(Either.Right(output)) + last = { output } state = step.state } } catch (e: Throwable) { - emit(Either.Left(orElse(e.nonFatalOrThrow()))) + emit(Either.Left(orElse(e.nonFatalOrThrow(), last?.invoke()))) loop = false } } From b998bc9a5f18a9786f22b6263f0f4a951a084982 Mon Sep 17 00:00:00 2001 From: i-walker Date: Wed, 2 Mar 2022 12:59:42 +0000 Subject: [PATCH 16/19] Update API files --- arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index a262f3ad0b4..57360067fe1 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -396,8 +396,8 @@ public abstract class arrow/fx/coroutines/Schedule { public abstract fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public final fun repeat (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public final fun repeatAsFlowOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlowOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun untilInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; From 653762a93102b44bf571842007b867ce8c79c3fb Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Wed, 2 Mar 2022 14:15:58 +0100 Subject: [PATCH 17/19] fix --- .../src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 1bd76015d1f..5e2c8a52fc5 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -229,7 +229,7 @@ class ScheduleTest : ArrowFxSpec( "repeatAsFlow fails fast on errors" { val ex = Throwable("Hello") - Schedule.recurs(0).repeatAsFlow({ throw ex }, ::identity) + Schedule.recurs(0).repeatAsFlow({ throw ex }, { t, _ -> t }) .collect { either -> either.fold({ it shouldBe ex }, { fail("The impossible happened") }) } } From f2419d203f4aa761247200065b29d212faa761ed Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Wed, 2 Mar 2022 14:25:10 +0100 Subject: [PATCH 18/19] fix imports/ depreactions --- .../commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 5e2c8a52fc5..b5479345cfd 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -2,16 +2,15 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.Eval -import arrow.core.identity import io.kotest.assertions.fail import io.kotest.matchers.should import io.kotest.matchers.shouldBe import kotlinx.coroutines.withTimeoutOrNull import kotlin.math.pow +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.Duration.Companion.seconds import kotlin.time.ExperimentalTime -import kotlin.time.milliseconds -import kotlin.time.seconds import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.collect From 0fc7a1d07f8a4f472d9111c8d1b1756c728d1187 Mon Sep 17 00:00:00 2001 From: i-walker <46971368+i-walker@users.noreply.github.com> Date: Wed, 9 Mar 2022 00:27:50 +0100 Subject: [PATCH 19/19] adjust names --- .../fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api | 4 ++-- .../commonMain/kotlin/arrow/fx/coroutines/Schedule.kt | 10 +++++----- .../kotlin/arrow/fx/coroutines/ScheduleTest.kt | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index 57360067fe1..2d5969e3fca 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -396,10 +396,10 @@ public abstract class arrow/fx/coroutines/Schedule { public abstract fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public final fun repeat (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public final fun repeatAsFlowOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatOrElseAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun repeatOrElseEitherAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun untilInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; public final fun untilOutput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; public final fun void ()Larrow/fx/coroutines/Schedule; diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index 4aca963a8d1..6e76aeafebc 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -221,7 +221,7 @@ public sealed class Schedule { public suspend fun repeatOrElse(fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> Output): Output = repeatOrElseEither(fa, orElse).fold(::identity, ::identity) - public abstract suspend fun repeatAsFlow( + public abstract suspend fun repeatOrElseEitherAsFlow( fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> C ): Flow> @@ -231,17 +231,17 @@ public sealed class Schedule { * This will raise an error if a repeat failed. */ public suspend fun repeatAsFlow(fa: suspend () -> Input): Flow = - repeatAsFlowOrElse(fa) { e, _ -> throw e } + repeatOrElseAsFlow(fa) { e, _ -> throw e } /** * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. * Also offers a function to handle errors if they are encountered during repetition. */ - public suspend fun repeatAsFlowOrElse( + public suspend fun repeatOrElseAsFlow( fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> Output ): Flow = - repeatAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } + repeatOrElseEitherAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } /** * Changes the output of a schedule. Does not alter the decision of the schedule. @@ -487,7 +487,7 @@ public sealed class Schedule { } } - override suspend fun repeatAsFlow( + override suspend fun repeatOrElseEitherAsFlow( fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> C ): Flow> = diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index b5479345cfd..4ae2907d34c 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -228,7 +228,7 @@ class ScheduleTest : ArrowFxSpec( "repeatAsFlow fails fast on errors" { val ex = Throwable("Hello") - Schedule.recurs(0).repeatAsFlow({ throw ex }, { t, _ -> t }) + Schedule.recurs(0).repeatOrElseEitherAsFlow({ throw ex }, { t, _ -> t }) .collect { either -> either.fold({ it shouldBe ex }, { fail("The impossible happened") }) } }