diff --git a/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt b/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt index b6eda368a9f..6ede23a21cc 100644 --- a/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines-test/src/jvmTest/kotlin/arrow/fx/coroutines/PredefTest.kt @@ -1,5 +1,6 @@ package arrow.fx.coroutines +import arrow.core.Either import io.kotest.matchers.shouldBe import io.kotest.property.Arb import io.kotest.property.arbitrary.int diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index 749ee0204ab..2d5969e3fca 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -395,8 +395,11 @@ public abstract class arrow/fx/coroutines/Schedule { public final fun or (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public abstract fun pipe (Larrow/fx/coroutines/Schedule;)Larrow/fx/coroutines/Schedule; public final fun repeat (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun repeatOrElse (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun repeatOrElseAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun repeatOrElseEither (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun repeatOrElseEitherAsFlow (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun untilInput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; public final fun untilOutput (Lkotlin/jvm/functions/Function2;)Larrow/fx/coroutines/Schedule; public final fun void ()Larrow/fx/coroutines/Schedule; diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt index ca7bca1aa13..6e76aeafebc 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Schedule.kt @@ -19,6 +19,10 @@ import kotlin.random.Random import kotlin.time.Duration import kotlin.time.ExperimentalTime import kotlin.time.nanoseconds +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.retry /** * # Retrying and repeating effects @@ -217,6 +221,28 @@ public sealed class Schedule { public suspend fun repeatOrElse(fa: suspend () -> Input, orElse: suspend (Throwable, Output?) -> Output): Output = repeatOrElseEither(fa, orElse).fold(::identity, ::identity) + public abstract suspend fun repeatOrElseEitherAsFlow( + fa: suspend () -> Input, + orElse: suspend (Throwable, Output?) -> C + ): Flow> + + /** + * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. + * This will raise an error if a repeat failed. + */ + public suspend fun repeatAsFlow(fa: suspend () -> Input): Flow = + repeatOrElseAsFlow(fa) { e, _ -> throw e } + + /** + * Runs this effect and emits the output, if it succeeded, decide using the provided policy if the effect should be repeated and emitted, if so, with how much delay. + * Also offers a function to handle errors if they are encountered during repetition. + */ + public suspend fun repeatOrElseAsFlow( + fa: suspend () -> Input, + orElse: suspend (Throwable, Output?) -> Output + ): Flow = + repeatOrElseEitherAsFlow(fa, orElse).map { it.fold(::identity, ::identity) } + /** * Changes the output of a schedule. Does not alter the decision of the schedule. */ @@ -292,7 +318,10 @@ public sealed class Schedule { /** * Accumulates the results of a schedule by folding over them effectfully. */ - public abstract fun foldLazy(initial: suspend () -> C, f: suspend (acc: C, output: Output) -> C): Schedule + public abstract fun foldLazy( + initial: suspend () -> C, + f: suspend (acc: C, output: Output) -> C + ): Schedule /** * Composes this schedule with the other schedule by piping the output of this schedule @@ -458,6 +487,38 @@ public sealed class Schedule { } } + override suspend fun repeatOrElseEitherAsFlow( + fa: suspend () -> Input, + orElse: suspend (Throwable, Output?) -> C + ): Flow> = + flow { + var loop = true + var last: (() -> Output)? = null // We haven't seen any input yet + var state: State = initialState.invoke() + + while (loop) { + coroutineContext.ensureActive() + try { + val a = fa.invoke() + val step = update(a, state) + if (!step.cont) { + emit(Either.Right(step.finish.value())) + loop = false + } else { + delay((step.delayInNanos / 1_000_000).toLong()) + val output = step.finish.value() + // Set state before looping again and emit Output + emit(Either.Right(output)) + last = { output } + state = step.state + } + } catch (e: Throwable) { + emit(Either.Left(orElse(e.nonFatalOrThrow(), last?.invoke()))) + loop = false + } + } + } + override fun map(f: (output: Output) -> B): Schedule = ScheduleImpl(initialState) { i, s -> update(i, s).map(f) } @@ -621,7 +682,12 @@ public sealed class Schedule { /** * A single decision. Contains the decision to continue, the delay, the new state and the (lazy) result of a Schedule. */ - public data class Decision(val cont: Boolean, val delayInNanos: Double, val state: A, val finish: Eval) { + public data class Decision( + val cont: Boolean, + val delayInNanos: Double, + val state: A, + val finish: Eval + ) { @ExperimentalTime val duration: Duration @@ -638,6 +704,7 @@ public sealed class Schedule { public fun map(g: (B) -> D): Decision = bimap(::identity, g) + public fun combineNanos( other: Decision, f: (Boolean, Boolean) -> Boolean, @@ -736,7 +803,7 @@ public sealed class Schedule { unfold(0) { it + 1 } /** - * Creates a Schedule that continues n times and returns the number of iterations. + * Creates a Schedule that continues [n] times and returns the number of iterations. */ public fun recurs(n: Int): Schedule = Schedule(suspend { 0 }) { _: A, acc -> @@ -940,7 +1007,10 @@ public suspend fun Schedule.retry(fa: suspend () -> A): A = * Runs an effect and, if it fails, decide using the provided policy if the effect should be retried and if so, with how much delay. * Also offers a function to handle errors if they are encountered during retrial. */ -public suspend fun Schedule.retryOrElse(fa: suspend () -> A, orElse: suspend (Throwable, B) -> A): A = +public suspend fun Schedule.retryOrElse( + fa: suspend () -> A, + orElse: suspend (Throwable, B) -> A +): A = retryOrElseEither(fa, orElse).fold(::identity, ::identity) /** diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt index 87c865eac9c..4ae2907d34c 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScheduleTest.kt @@ -7,10 +7,15 @@ import io.kotest.matchers.should import io.kotest.matchers.shouldBe import kotlinx.coroutines.withTimeoutOrNull import kotlin.math.pow +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.Duration.Companion.seconds import kotlin.time.ExperimentalTime -import kotlin.time.milliseconds -import kotlin.time.nanoseconds -import kotlin.time.seconds +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.flow.zip @ExperimentalTime class ScheduleTest : ArrowFxSpec( @@ -173,6 +178,10 @@ class ScheduleTest : ArrowFxSpec( checkRepeat(Schedule.recurs(20_000), expected = 20_000) } + "repeatAsFlow is stack-safe" { + checkRepeatAsFlow(Schedule.recurs(500_000), expected = (1..500_000).asFlow()) + } + "repeat" { val stop = RuntimeException("WOOO") val dec = Schedule.Decision(true, 10.0, 0, Eval.now("state")) @@ -192,15 +201,48 @@ class ScheduleTest : ArrowFxSpec( l shouldBe Either.Left(stop) } + "repeatAsFlow" { + val stop = RuntimeException("WOOO") + val dec = Schedule.Decision(true, 10.0, 0, Eval.now("state")) + val n = 100 + val schedule = Schedule({ 0 }) { _: Unit, _ -> dec } + + val eff = SideEffect() + + val l = Either.catch { + schedule.repeatAsFlow { + if (eff.counter >= n) throw stop + else eff.increment() + }.collect() + } + + eff.counter shouldBe 100 + l shouldBe Either.Left(stop) + } + "repeat fails fast on errors" { val ex = Throwable("Hello") Schedule.recurs(0).repeatOrElseEither({ throw ex }) { exc, _ -> exc } .fold({ it shouldBe ex }, { fail("The impossible happened") }) } + "repeatAsFlow fails fast on errors" { + val ex = Throwable("Hello") + Schedule.recurs(0).repeatOrElseEitherAsFlow({ throw ex }, { t, _ -> t }) + .collect { either -> either.fold({ it shouldBe ex }, { fail("The impossible happened") }) } + } + "repeat should run the schedule with the correct input" { var i = 0 - (Schedule.recurs(10).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..10).toList() + val n = 10 + (Schedule.recurs(n).zipRight(Schedule.collect())).repeat { i++ } shouldBe (0..n).toList() + } + + "repeatAsFlow should run the schedule with the correct input" { + var i = 0 + val n = 10 + (Schedule.recurs(n).zipRight(Schedule.collect())).repeatAsFlow { i++ }.toList() shouldBe + (0..n).map { (0..it).toList() } } "retry is stack-safe" { @@ -291,6 +333,14 @@ private suspend fun checkRepeat(schedule: Schedule, expected: B): Un } shouldBe expected } +private suspend fun checkRepeatAsFlow(schedule: Schedule, expected: Flow): Unit { + val count = Atomic(0) + schedule.repeatAsFlow { + count.updateAndGet { it + 1 } + }.zip(expected, ::Pair) + .collect { (a, b) -> a shouldBe b } +} + @ExperimentalTime private infix fun Schedule.Decision.eqv(other: Schedule.Decision): Unit { require(cont == other.cont) { "Decision#cont: ${this.cont} shouldBe ${other.cont}" }