Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new CancellableContinuation.resume with onCancellation lambda that takes the resumption value #3093

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -39,22 +39,24 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun isCancelled ()Z
public abstract fun isCompleted ()Z
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public abstract fun resume (Ljava/lang/Object;Lkotlinx/coroutines/OnCancellationHandler;)V
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlinx/coroutines/OnCancellationHandler;)Ljava/lang/Object;
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public static synthetic fun cancel$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static fun resume (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun tryResume$default (Lkotlinx/coroutines/CancellableContinuation;Ljava/lang/Object;Ljava/lang/Object;ILjava/lang/Object;)Ljava/lang/Object;
}

public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlinx/coroutines/OnCancellationHandler;Ljava/lang/Object;Ljava/lang/Throwable;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
Expand All @@ -69,12 +71,13 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun isCompleted ()Z
protected fun nameString ()Ljava/lang/String;
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public fun resume (Ljava/lang/Object;Lkotlinx/coroutines/OnCancellationHandler;)V
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlinx/coroutines/OnCancellationHandler;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -510,6 +513,10 @@ public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/C
public abstract interface annotation class kotlinx/coroutines/ObsoleteCoroutinesApi : java/lang/annotation/Annotation {
}

public abstract interface class kotlinx/coroutines/OnCancellationHandler {
public abstract fun invoke (Ljava/lang/Object;Ljava/lang/Throwable;Lkotlin/coroutines/CoroutineContext;)V
}

public abstract interface class kotlinx/coroutines/ParentJob : kotlinx/coroutines/Job {
public abstract fun getChildJobCancellationCause ()Ljava/util/concurrent/CancellationException;
}
Expand Down
51 changes: 49 additions & 2 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Expand Up @@ -87,7 +87,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
public fun tryResume(value: T, idempotent: Any?, onCancellation: OnCancellationHandler<@UnsafeVariance T>?): Any?

/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
Expand Down Expand Up @@ -201,8 +201,55 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
@Deprecated(level = DeprecationLevel.WARNING,
message = "Replaced with resume(value: T, onCancellation: OnCancellation<T>?)")
@ExperimentalCoroutinesApi // since 1.2.0
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) {
if (onCancellation == null) {
resume(value)
return
}
val onCancellationNew = OnCancellationHandler<T> { _, cause, _ ->
onCancellation(cause ?: CancellationException("Cancelled"))
}
resume(value, onCancellationNew)
}

/**
* Resumes this continuation with the specified `value` and calls the specified `onCancellation`
* handler when either resumed too late (when continuation was already cancelled) or, although resumed
* successfully (before cancellation), the coroutine's job was cancelled before it had a
* chance to run in its dispatcher, so that the suspended function threw an exception
* instead of returning this value.
*
* The installed [onCancellation] handler should not throw any exceptions.
* If it does, they will get caught, wrapped into a [CompletionHandlerException] and
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
* This function shall be used when resuming with a resource that must be closed by
* code that called the corresponding suspending function, for example:
*
* ```
* continuation.resume(resource) { r, cause ->
* r.close(cause)
* }
* ```
*
* A more complete example and further details are given in
* the documentation for the [suspendCancellableCoroutine] function.
*
* **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
public fun resume(value: T, onCancellation: OnCancellationHandler<@UnsafeVariance T>?)
}

public fun interface OnCancellationHandler<in T> {
public fun invoke(value: T, cause: Throwable?, context: CoroutineContext)
}

/**
Expand Down
49 changes: 26 additions & 23 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Expand Up @@ -117,7 +117,7 @@ internal open class CancellableContinuationImpl<in T>(
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
if (state is CompletedContinuation && state.idempotentResume != null) {
if (state is CompletedContinuation<*> && state.idempotentResume != null) {
// Cannot reuse continuation that was resumed with idempotent marker
detachChild()
return false
Expand All @@ -140,18 +140,19 @@ internal open class CancellableContinuationImpl<in T>(
when (state) {
is NotCompleted -> error("Not completed")
is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
check(!state.cancelled) { "Must be called at most once" }
val update = state.copy(cancelCause = cause)
if (_state.compareAndSet(state, update)) {
state.invokeHandlers(this, cause)
state as CompletedContinuation<T>
state.invokeHandlers(this, state.result, cause)
return // done
}
}
else -> {
// completed normally without marker class, promote to CompletedContinuation in case
// if invokeOnCancellation if called later
if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
if (_state.compareAndSet(state, CompletedContinuation<T>(state as T, cancelCause = cause))) {
return // done
}
}
Expand Down Expand Up @@ -212,9 +213,9 @@ internal open class CancellableContinuationImpl<in T>(
fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
callCancelHandlerSafely { handler.invoke(cause) }

fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
fun callOnCancellation(onCancellation: OnCancellationHandler<@UnsafeVariance T>, value: T, cause: Throwable) {
try {
onCancellation.invoke(cause)
onCancellation.invoke(value, cause, context)
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
Expand Down Expand Up @@ -325,8 +326,9 @@ internal open class CancellableContinuationImpl<in T>(
override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)

override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
override fun resume(value: T, onCancellation: OnCancellationHandler<@UnsafeVariance T>?) {
resumeImpl(value, resumeMode, onCancellation)
}

public override fun invokeOnCancellation(handler: CompletionHandler) {
val cancelHandler = makeCancelHandler(handler)
Expand All @@ -353,7 +355,7 @@ internal open class CancellableContinuationImpl<in T>(
}
return
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
/*
* Continuation was already completed, and might already have cancel handler.
*/
Expand All @@ -375,7 +377,7 @@ internal open class CancellableContinuationImpl<in T>(
* does not need to be called in this case.
*/
if (cancelHandler is BeforeResumeCancelHandler) return
val update = CompletedContinuation(state, cancelHandler = cancelHandler)
val update = CompletedContinuation<T>(state as T, cancelHandler = cancelHandler)
if (_state.compareAndSet(state, update)) return // quit on cas success
}
}
Expand All @@ -399,7 +401,7 @@ internal open class CancellableContinuationImpl<in T>(
state: NotCompleted,
proposedUpdate: Any?,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)?,
onCancellation: OnCancellationHandler<T>?,
idempotent: Any?
): Any? = when {
proposedUpdate is CompletedExceptionally -> {
Expand All @@ -411,14 +413,14 @@ internal open class CancellableContinuationImpl<in T>(
onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null ->
// mark as CompletedContinuation if special cases are present:
// Cancellation handlers that shall be called after resume or idempotent resume
CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
CompletedContinuation(proposedUpdate as T, state as? CancelHandler, onCancellation, idempotent)
else -> proposedUpdate // simple case -- use the value directly
}

private fun resumeImpl(
proposedUpdate: Any?,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)? = null
onCancellation: OnCancellationHandler<T>? = null
) {
_state.loop { state ->
when (state) {
Expand All @@ -437,7 +439,8 @@ internal open class CancellableContinuationImpl<in T>(
*/
if (state.makeResumed()) { // check if trying to resume one (otherwise error)
// call onCancellation
onCancellation?.let { callOnCancellation(it, state.cause) }
proposedUpdate as T
onCancellation?.let { callOnCancellation(it, proposedUpdate, state.cause) }
return // done
}
}
Expand All @@ -453,7 +456,7 @@ internal open class CancellableContinuationImpl<in T>(
private fun tryResumeImpl(
proposedUpdate: Any?,
idempotent: Any?,
onCancellation: ((cause: Throwable) -> Unit)?
onCancellation: OnCancellationHandler<T>?
): Symbol? {
_state.loop { state ->
when (state) {
Expand All @@ -463,7 +466,7 @@ internal open class CancellableContinuationImpl<in T>(
detachChildIfNonResuable()
return RESUME_TOKEN
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
return if (idempotent != null && state.idempotentResume === idempotent) {
assert { state.result == proposedUpdate } // "Non-idempotent resume"
RESUME_TOKEN // resumed with the same token -- ok
Expand Down Expand Up @@ -499,7 +502,7 @@ internal open class CancellableContinuationImpl<in T>(
override fun tryResume(value: T, idempotent: Any?): Any? =
tryResumeImpl(value, idempotent, onCancellation = null)

override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
override fun tryResume(value: T, idempotent: Any?, onCancellation: OnCancellationHandler<@UnsafeVariance T>?): Any? =
tryResumeImpl(value, idempotent, onCancellation)

override fun tryResumeWithException(exception: Throwable): Any? =
Expand All @@ -524,7 +527,7 @@ internal open class CancellableContinuationImpl<in T>(
@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
when (state) {
is CompletedContinuation -> state.result as T
is CompletedContinuation<*> -> state.result as T
else -> state as T
}

Expand All @@ -534,7 +537,7 @@ internal open class CancellableContinuationImpl<in T>(
super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }

// For nicer debugging
public override fun toString(): String =
override fun toString(): String =
"${nameString()}(${delegate.toDebugString()}){$stateDebugRepresentation}@$hexAddress"

protected open fun nameString(): String =
Expand Down Expand Up @@ -574,17 +577,17 @@ private class InvokeOnCancel( // Clashes with InvokeOnCancellation
}

// Completed with additional metadata
private data class CompletedContinuation(
@JvmField val result: Any?,
private data class CompletedContinuation<T>(
@JvmField val result: T,
@JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
@JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
@JvmField val onCancellation: OnCancellationHandler<T>? = null, // installed via resume block
@JvmField val idempotentResume: Any? = null,
@JvmField val cancelCause: Throwable? = null
) {
val cancelled: Boolean get() = cancelCause != null

fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
fun invokeHandlers(cont: CancellableContinuationImpl<T>, resumeValue: T, cause: Throwable) {
cancelHandler?.let { cont.callCancelHandler(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, resumeValue, cause) }
}
}