Skip to content

Commit

Permalink
WIP: provide resume and tryResume that pass the value to the call…
Browse files Browse the repository at this point in the history
…back

Fixes some concerns in #4088
  • Loading branch information
dkhalanskyjb committed Apr 5, 2024
1 parent 7bdc901 commit e29c85b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 22 deletions.
5 changes: 4 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -54,7 +54,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public class kotlinx/coroutines/CancellableContinuationImpl : kotlinx/coroutines/DispatchedTask, kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/Waiter {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function2;Ljava/lang/Throwable;Ljava/lang/Object;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
Expand All @@ -76,12 +76,15 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlinx/coroutines
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public final fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun resume (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun tryResume (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/ChildContinuation {
Expand Down
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Expand Up @@ -200,6 +200,16 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
}

public fun <T> CancellableContinuation<T>.resume(value: T, onCancellation: (cause: Throwable, value: T) -> Unit): Unit = when (this) {
is CancellableContinuationImpl -> resume(value, onCancellation)
else -> throw UnsupportedOperationException("third-party implementation of CancellableContinuation is not supported")
}

public fun <T> CancellableContinuation<T>.tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable, value: T) -> Unit)?): Any? = when (this) {
is CancellableContinuationImpl -> tryResume(value, idempotent, onCancellation)
else -> throw UnsupportedOperationException("third-party implementation of CancellableContinuation is not supported")
}

/**
* A version of `invokeOnCancellation` that accepts a class as a handler instead of a lambda, but identical otherwise.
* This allows providing a custom [toString] instance that will look better during debugging.
Expand Down
48 changes: 27 additions & 21 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Expand Up @@ -146,7 +146,7 @@ internal open class CancellableContinuationImpl<in T>(
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
if (state is CompletedContinuation && state.idempotentResume != null) {
if (state is CompletedContinuation<*> && state.idempotentResume != null) {
// Cannot reuse continuation that was resumed with idempotent marker
detachChild()
return false
Expand All @@ -169,7 +169,7 @@ internal open class CancellableContinuationImpl<in T>(
when (state) {
is NotCompleted -> error("Not completed")
is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
check(!state.cancelled) { "Must be called at most once" }
val update = state.copy(cancelCause = cause)
if (_state.compareAndSet(state, update)) {
Expand Down Expand Up @@ -250,9 +250,9 @@ internal open class CancellableContinuationImpl<in T>(
callCancelHandlerSafely { segment.onCancellation(index, cause, context) }
}

fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
fun <R> callOnCancellation(onCancellation: (cause: Throwable, value: R) -> Unit, cause: Throwable, value: R) {
try {
onCancellation.invoke(cause)
onCancellation.invoke(cause, value)
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
Expand Down Expand Up @@ -364,6 +364,9 @@ internal open class CancellableContinuationImpl<in T>(
resumeImpl(result.toState(this), resumeMode)

override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
resumeImpl(value, resumeMode, onCancellation?.let { { cause, _ -> onCancellation(cause) } })

internal fun <R> resume(value: R, onCancellation: ((cause: Throwable, value: R) -> Unit)) =
resumeImpl(value, resumeMode, onCancellation)

/**
Expand Down Expand Up @@ -425,7 +428,7 @@ internal open class CancellableContinuationImpl<in T>(
}
return
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
/*
* Continuation was already completed, and might already have cancel handler.
*/
Expand Down Expand Up @@ -466,11 +469,11 @@ internal open class CancellableContinuationImpl<in T>(
dispatch(mode)
}

private fun resumedState(
private fun <R> resumedState(
state: NotCompleted,
proposedUpdate: Any?,
proposedUpdate: R,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)?,
onCancellation: ((cause: Throwable, value: R) -> Unit)?,
idempotent: Any?
): Any? = when {
proposedUpdate is CompletedExceptionally -> {
Expand All @@ -486,10 +489,10 @@ internal open class CancellableContinuationImpl<in T>(
else -> proposedUpdate // simple case -- use the value directly
}

private fun resumeImpl(
proposedUpdate: Any?,
internal fun <R> resumeImpl(
proposedUpdate: R,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)? = null
onCancellation: ((cause: Throwable, value: R) -> Unit)? = null
) {
_state.loop { state ->
when (state) {
Expand All @@ -508,7 +511,7 @@ internal open class CancellableContinuationImpl<in T>(
*/
if (state.makeResumed()) { // check if trying to resume one (otherwise error)
// call onCancellation
onCancellation?.let { callOnCancellation(it, state.cause) }
onCancellation?.let { callOnCancellation(it, state.cause, proposedUpdate) }
return // done
}
}
Expand All @@ -521,10 +524,10 @@ internal open class CancellableContinuationImpl<in T>(
* Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
* Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
*/
private fun tryResumeImpl(
proposedUpdate: Any?,
private fun <R> tryResumeImpl(
proposedUpdate: R,
idempotent: Any?,
onCancellation: ((cause: Throwable) -> Unit)?
onCancellation: ((cause: Throwable, value: R) -> Unit)?
): Symbol? {
_state.loop { state ->
when (state) {
Expand All @@ -534,7 +537,7 @@ internal open class CancellableContinuationImpl<in T>(
detachChildIfNonResuable()
return RESUME_TOKEN
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
return if (idempotent != null && state.idempotentResume === idempotent) {
assert { state.result == proposedUpdate } // "Non-idempotent resume"
RESUME_TOKEN // resumed with the same token -- ok
Expand Down Expand Up @@ -571,6 +574,9 @@ internal open class CancellableContinuationImpl<in T>(
tryResumeImpl(value, idempotent, onCancellation = null)

override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
tryResumeImpl(value, idempotent, onCancellation?.let { { cause, _ -> onCancellation(cause) } })

fun <R> tryResume(value: R, idempotent: Any?, onCancellation: ((cause: Throwable, value: R) -> Unit)?): Any? =
tryResumeImpl(value, idempotent, onCancellation)

override fun tryResumeWithException(exception: Throwable): Any? =
Expand All @@ -595,7 +601,7 @@ internal open class CancellableContinuationImpl<in T>(
@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
when (state) {
is CompletedContinuation -> state.result as T
is CompletedContinuation<*> -> state.result as T
else -> state as T
}

Expand Down Expand Up @@ -664,17 +670,17 @@ internal interface CancelHandler : NotCompleted {
}

// Completed with additional metadata
private data class CompletedContinuation(
@JvmField val result: Any?,
private data class CompletedContinuation<R>(
@JvmField val result: R,
@JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
@JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
@JvmField val onCancellation: ((cause: Throwable, value: R) -> Unit)? = null, // installed via resume block
@JvmField val idempotentResume: Any? = null,
@JvmField val cancelCause: Throwable? = null
) {
val cancelled: Boolean get() = cancelCause != null

fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
cancelHandler?.let { cont.callCancelHandler(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, cause, result) }
}
}

0 comments on commit e29c85b

Please sign in to comment.