Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve debugging experience of leaked shift calls #2884

Merged
merged 17 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions arrow-libs/core/arrow-core/api/arrow-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -2749,6 +2749,7 @@ public final class arrow/core/continuations/FoldContinuation : arrow/core/contin
public fun ensure (ZLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
public final fun getRecover ()Lkotlin/jvm/functions/Function2;
public final fun isActive ()Ljava/util/concurrent/atomic/AtomicReference;
public fun resumeWith (Ljava/lang/Object;)V
public final fun setRecover (Lkotlin/jvm/functions/Function2;)V
public fun shift (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -3023,6 +3024,10 @@ public final class arrow/core/continuations/ResultKt {
public abstract class arrow/core/continuations/ShiftCancellationException : arrow/core/continuations/CancellationExceptionNoTrace {
}

public final class arrow/core/continuations/ShiftLeakedException : java/lang/IllegalStateException {
public fun <init> ()V
}

public final class arrow/core/continuations/Suspend : arrow/core/continuations/ShiftCancellationException {
public fun <init> (Larrow/core/continuations/Token;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public final fun getRecover ()Lkotlin/jvm/functions/Function2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public fun <R, A> eagerEffect(f: suspend EagerEffectScope<R>.() -> A): EagerEffe
private class DefaultEagerEffect<R, A>(private val f: suspend EagerEffectScope<R>.() -> A) : EagerEffect<R, A> {
override fun <B> fold(recover: (R) -> B, transform: (A) -> B): B {
val token = Token()
val isActive = AtomicRef(true)
val eagerEffectScope =
object : EagerEffectScope<R> {
// Shift away from this Continuation by intercepting it, and completing it with
Expand All @@ -180,21 +181,26 @@ private class DefaultEagerEffect<R, A>(private val f: suspend EagerEffectScope<R
// CancellationException and thus effectively recovering from the cancellation/shift.
// This means try/catch is also capable of recovering from monadic errors.
// See: EagerEffectSpec - try/catch tests
throw Eager(token, r, recover as (Any?) -> Any?)
if (isActive.get()) throw Eager(token, r, recover as (Any?) -> Any?)
else throw ShiftLeakedException()
}

return try {
suspend { transform(f(eagerEffectScope)) }
.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
suspend {
val res = f(eagerEffectScope).also { isActive.set(false) }
transform(res)
}.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
result.getOrElse { throwable ->
if (throwable is Eager && token == throwable.token) {
throwable.recover(throwable.shifted) as B
} else throw throwable
}
}) as B
} catch (e: Eager) {
if (token == e.token) e.recover(e.shifted) as B
else throw e
if (token == e.token) {
isActive.set(false)
e.recover(e.shifted) as B
} else throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import kotlin.coroutines.resumeWithException
* [withContext](#withcontext)
* [async](#async)
* [launch](#launch)
* [Strange edge cases](#strange-edge-cases)
* [Leaking `shift`](#leaking-shift)

* <!--- END -->
*
Expand Down Expand Up @@ -418,7 +418,7 @@ import kotlin.coroutines.resumeWithException
*
* ### KotlinX
* #### withContext
* It's always safe to call `shift` from `withContext` since it runs in place, so it has no way of leaking `shift`.
* It's always safe to call `shift` from `withContext` since it runs _in place_, so it has no way of leaking `shift`.
* When `shift` is called from within `withContext` it will cancel all `Job`s running inside the `CoroutineScope` of `withContext`.
*
* <!--- INCLUDE
Expand Down Expand Up @@ -483,6 +483,8 @@ import kotlin.coroutines.resumeWithException
* #### async
*
* When calling `shift` from `async` you should **always** call `await`, otherwise `shift` can leak out of its scope.
* So it's safe to call `shift` from `async` as long as you **always** call `await` on the `Deferred` returned by `async`,
* but we advise using Arrow Fx `parZip`, `raceN`, `parTraverse`, etc instead.
*
* <!--- INCLUDE
* import arrow.core.continuations.effect
Expand All @@ -500,14 +502,30 @@ import kotlin.coroutines.resumeWithException
* val fa = async<Int> { shift(errorA) }
* val fb = async<Int> { shift(errorB) }
* fa.await() + fb.await()
* }.fold({ error -> error shouldBeIn listOf(errorA, errorB) }, { fail("Int can never be the result") })
* }.fold(
* { error ->
* println(error)
* error shouldBeIn listOf(errorA, errorB)
* },
* { fail("Int can never be the result") }
* )
* }
* }
* ```
* <!--- KNIT example-effect-guide-11.kt -->
* ```text
* ErrorA
* ```
*
* The example here will always print `ErrorA`, but never `ErrorB`. This is because `fa` is awaited first, and when it's `shifts` it will cancel `fb`.
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
* If instead we used `awaitAll`, then it would print `ErrorA` or `ErrorB` due to both `fa` and `fb` being awaited in parallel.
*
* #### launch
*
* It's **not allowed** to call `shift` from within `launch`, this is because `launch` creates a separate unrelated child Job/Continuation.
* Any calls to `shift` inside of `launch` will be ignored by `effect`, and result in an exception being thrown inside `launch`.
* Because KotlinX Coroutines ignores `CancellationException`, and thus swallows the `shift` call.
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
*
* <!--- INCLUDE
* import arrow.core.continuations.effect
* import io.kotest.assertions.fail
Expand All @@ -519,29 +537,30 @@ import kotlin.coroutines.resumeWithException
* suspend fun main() {
* val errorA = "ErrorA"
* val errorB = "ErrorB"
* val int = 45
* effect<String, Int> {
* coroutineScope<Int> {
* launch { shift(errorA) }
* launch { shift(errorB) }
* int
* 45
* }
* }.fold({ fail("Shift can never finish") }, { it shouldBe int })
* }.fold({ fail("Shift can never finish") }, ::println)
* }
* ```
* <!--- KNIT example-effect-guide-12.kt -->
* ```text
* 45
* ```
*
* As you can see from the output, the `effect` block is still executed, but the `shift` calls inside `launch` are ignored.
*
* #### Strange edge cases
* #### Leaking `shift`
*
* **NOTE**
* Capturing `shift` into a lambda, and leaking it outside of `Effect` to be invoked outside will yield unexpected results.
* Below we capture `shift` from inside the DSL, and then invoke it outside its context `EffectScope<String>`.
* **IMPORTANT:** Capturing `shift` and leaking it outside of `effect { }` and invoking it outside its scope will yield unexpected results.
*
* Below an example of the capturing of `shift` inside a `suspend lambda`, and then invoking it outside its `effect { }` scope.
*
* <!--- INCLUDE
* import arrow.core.continuations.effect
* import kotlinx.coroutines.Deferred
* import kotlinx.coroutines.async
* import kotlinx.coroutines.coroutineScope
*
* suspend fun main() {
* -->
Expand All @@ -553,21 +572,63 @@ import kotlin.coroutines.resumeWithException
* suspend { shift("error") }
* }.fold({ }, { leakedShift -> leakedShift.invoke() })
* ```
* <!--- KNIT example-effect-guide-13.kt -->
*
* The same violation is possible in all DSLs in Kotlin, including Structured Concurrency.
* When we invoke `leakedShift` outside of `effect { }` a special `ShiftLeakedException` is thrown to improve the debugging experience.
* The message clearly states that `shift` was leaked outside its scope, and the stacktrace will point to the exact location where `shift` was captured.
* In this case in line `9` of `example-effect-guide-13.kt`, which is stated in the second line of the stacktrace: `invokeSuspend(example-effect-guide-13.kt:9)`.
*
* ```kotlin
* val leakedAsync = coroutineScope<suspend () -> Deferred<Unit>> {
* suspend {
* async {
* println("I am never going to run, until I get called invoked from outside")
* }
* }
* }
* ```text
* Exception in thread "main" arrow.core.continuations.ShiftLeakedException:
* shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked
* This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders.
*
* leakedAsync.invoke().await()
* See: Effect KDoc for additional information.
* at arrow.core.continuations.FoldContinuation.shift(Effect.kt:770)
* at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invokeSuspend(example-effect-guide-13.kt:9)
* at arrow.core.examples.exampleEffectGuide13.Example_effect_guide_13Kt$main$2$1.invoke(example-effect-guide-13.kt)
* ```
* <!--- KNIT example-effect-guide-13.kt -->
*
* An example with KotlinX Coroutines launch. Which can _concurrently_ leak `shift` outside of its scope.
* In this case by _delaying_ the invocation of `shift` by `3.seconds`,
* we can see that the `ShiftLeakedException` is again thrown when `shift` is invoked.
*
* <!--- INCLUDE
* import kotlinx.coroutines.launch
* import kotlinx.coroutines.delay
* import kotlinx.coroutines.coroutineScope
* import kotlinx.coroutines.runBlocking
* import arrow.core.continuations.effect
* import kotlin.time.Duration.Companion.seconds
*
* fun main(): Unit = runBlocking {
* -->
* <!--- SUFFIX
* }
* -->
* ```kotlin
* effect<String, Int> {
* launch {
* delay(3.seconds)
* shift("error")
* }
* 1
* }.fold(::println, ::println)
* ```
* ```text
* 1
* Exception in thread "main" arrow.core.continuations.ShiftLeakedException:
* shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked
* This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders.
*
* See: Effect KDoc for additional information.
* at arrow.core.continuations.FoldContinuation.shift(Effect.kt:780)
* at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt$main$1$1$1$1.invokeSuspend(example-effect-guide-14.kt:17)
* at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) <13 internal lines>
* at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt.main(example-effect-guide-14.kt:11)
* at arrow.core.examples.exampleEffectGuide14.Example_effect_guide_14Kt.main(example-effect-guide-14.kt)
* ```
* <!--- KNIT example-effect-guide-14.kt -->
*/
public interface Effect<out R, out A> {
/**
Expand Down Expand Up @@ -716,17 +777,20 @@ internal class FoldContinuation<R, B>(
private val error: suspend (Throwable) -> B,
private val parent: Continuation<B>,
) : Continuation<B>, Token(), EffectScope<R> {

constructor(ignored: Token, context: CoroutineContext, parent: Continuation<B>) : this(context, { throw it }, parent)
constructor(
ignored: Token,
context: CoroutineContext,
error: suspend (Throwable) -> B,
parent: Continuation<B>,
) : this(context, error, parent)

lateinit var recover: suspend (R) -> Any?


// Add AtomicBoolean to arrow-atomic
val isActive: AtomicRef<Boolean> = AtomicRef(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about exposing the AtomicRef directly. Is it possible to make this internal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inside an internal class so it's internal by inheriting the visibility modifier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it shows in the arrow-core.api file... 🤔

Copy link
Member Author

@nomisRev nomisRev Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because internal is public in the (Java) binary 😭 Such visibility modifier doesn't exist on the JVM, and we cannot make it private.


// Shift away from this Continuation by intercepting it, and completing it with
// ShiftCancellationException
// This is needed because this function will never yield a result,
Expand All @@ -739,8 +803,9 @@ internal class FoldContinuation<R, B>(
// CancellationException and thus effectively recovering from the cancellation/shift.
// This means try/catch is also capable of recovering from monadic errors.
// See: EffectSpec - try/catch tests
throw Suspend(this, r, recover as suspend (Any?) -> Any?)

if (isActive.get()) throw Suspend(this, r, recover as suspend (Any?) -> Any?)
else throw ShiftLeakedException()

// In contrast to `createCoroutineUnintercepted this doesn't create a new ContinuationImpl
private fun (suspend () -> B).startCoroutineUnintercepted() {
try {
Expand All @@ -753,15 +818,20 @@ internal class FoldContinuation<R, B>(
parent.resumeWithException(e)
}
}

override fun resumeWith(result: Result<B>) {
result.fold(parent::resume) { throwable ->
when {
throwable is Suspend && this === throwable.token ->
throwable is Suspend && this === throwable.token -> {
isActive.set(false)
suspend { throwable.recover(throwable.shifted) as B }.startCoroutineUnintercepted()

}

throwable is Suspend -> parent.resumeWith(result)
else -> suspend { error(throwable.nonFatalOrThrow()) }.startCoroutineUnintercepted()
else -> {
isActive.set(false)
suspend { error(throwable.nonFatalOrThrow()) }.startCoroutineUnintercepted()
}
}
}
}
Expand Down Expand Up @@ -800,12 +870,12 @@ internal class FoldContinuation<R, B>(
public fun <R, A> effect(f: suspend EffectScope<R>.() -> A): Effect<R, A> = DefaultEffect(f)

private class DefaultEffect<R, A>(val f: suspend EffectScope<R>.() -> A) : Effect<R, A> {

override suspend fun <B> fold(
recover: suspend (shifted: R) -> B,
transform: suspend (value: A) -> B,
): B = fold({ throw it }, recover, transform)

// We create a `Token` for fold Continuation, so we can properly differentiate between nested folds
override suspend fun <B> fold(
error: suspend (error: Throwable) -> B,
Expand All @@ -816,18 +886,33 @@ private class DefaultEffect<R, A>(val f: suspend EffectScope<R>.() -> A) : Effec
val shift = FoldContinuation<R, B>(cont.context, error, cont)
shift.recover = recover
try {
val fold: suspend EffectScope<R>.() -> B = { transform(f(this)) }
val fold: suspend EffectScope<R>.() -> B = {
val res = f(this).also { shift.isActive.set(false) }
transform(res)
}
fold.startCoroutineUninterceptedOrReturn(shift, shift)
} catch (e: Suspend) {
if (shift === e.token) {
shift.isActive.set(false)
val f: suspend () -> B = { e.recover(e.shifted) as B }
f.startCoroutineUninterceptedOrReturn(cont)
} else throw e
} catch (e: Throwable) {
shift.isActive.set(false)
val f: suspend () -> B = { error(e.nonFatalOrThrow()) }
f.startCoroutineUninterceptedOrReturn(cont)
}
}
}

public suspend fun <A> Effect<A, A>.merge(): A = fold(::identity, ::identity)

public class ShiftLeakedException : IllegalStateException(
"""

shift or bind was called outside of its DSL scope, and the DSL Scoped operator was leaked
This is kind of usage is incorrect, make sure all calls to shift or bind occur within the lifecycle of effect { }, either { } or similar builders.

See: Effect KDoc for additional information.
""".trimIndent()
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.identity
import arrow.core.left
import arrow.core.right
import io.kotest.assertions.fail
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
Expand Down Expand Up @@ -133,4 +134,12 @@ class EagerEffectSpec : StringSpec({
}.runCont()
} shouldBe Either.Left(e)
}

"shift leaked results in ShiftLeakException" {
shouldThrow<ShiftLeakedException> {
effect {
suspend { shift<Unit>("failure") }
}.fold(::println) { it.invoke() }
}
}
})