Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: provide resume and tryResume that pass the value to the callback #4090

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -39,10 +39,11 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun isCancelled ()Z
public abstract fun isCompleted ()Z
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

Expand All @@ -54,7 +55,7 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/Waiter {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function2;Ljava/lang/Throwable;Ljava/lang/Object;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
Expand All @@ -70,12 +71,13 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun isCompleted ()Z
protected fun nameString ()Ljava/lang/String;
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

Expand Down
8 changes: 5 additions & 3 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
Expand Up @@ -104,13 +104,14 @@ abstract interface <#A: in kotlin/Any?> kotlinx.coroutines.channels/SendChannel
abstract interface <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuation : kotlin.coroutines/Continuation<#A> { // kotlinx.coroutines/CancellableContinuation|null[0]
abstract fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatched(#A) // kotlinx.coroutines/CancellableContinuation.resumeUndispatched|resumeUndispatched@kotlinx.coroutines.CoroutineDispatcher(1:0){}[0]
abstract fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatchedWithException(kotlin/Throwable) // kotlinx.coroutines/CancellableContinuation.resumeUndispatchedWithException|resumeUndispatchedWithException@kotlinx.coroutines.CoroutineDispatcher(kotlin.Throwable){}[0]
abstract fun <#A1: #A> resume(#A1, kotlin/Function2<kotlin/Throwable, #A1, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuation.resume|resume(0:0;kotlin.Function2<kotlin.Throwable,0:0,kotlin.Unit>?){0§<1:0>}[0]
abstract fun <#A1: #A> tryResume(#A1, kotlin/Any?, kotlin/Function2<kotlin/Throwable, #A1, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(0:0;kotlin.Any?;kotlin.Function2<kotlin.Throwable,0:0,kotlin.Unit>?){0§<1:0>}[0]
abstract fun cancel(kotlin/Throwable? = ...): kotlin/Boolean // kotlinx.coroutines/CancellableContinuation.cancel|cancel(kotlin.Throwable?){}[0]
abstract fun completeResume(kotlin/Any) // kotlinx.coroutines/CancellableContinuation.completeResume|completeResume(kotlin.Any){}[0]
abstract fun initCancellability() // kotlinx.coroutines/CancellableContinuation.initCancellability|initCancellability(){}[0]
abstract fun invokeOnCancellation(kotlin/Function1<kotlin/Throwable?, kotlin/Unit>) // kotlinx.coroutines/CancellableContinuation.invokeOnCancellation|invokeOnCancellation(kotlin.Function1<kotlin.Throwable?,kotlin.Unit>){}[0]
abstract fun resume(#A, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuation.resume|resume(1:0;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
abstract fun tryResume(#A, kotlin/Any? = ...): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(1:0;kotlin.Any?){}[0]
abstract fun tryResume(#A, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(1:0;kotlin.Any?;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
abstract fun tryResumeWithException(kotlin/Throwable): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResumeWithException|tryResumeWithException(kotlin.Throwable){}[0]
abstract val isActive // kotlinx.coroutines/CancellableContinuation.isActive|{}isActive[0]
abstract fun <get-isActive>(): kotlin/Boolean // kotlinx.coroutines/CancellableContinuation.isActive.<get-isActive>|<get-isActive>(){}[0]
Expand Down Expand Up @@ -773,11 +774,13 @@ open annotation class kotlinx.coroutines/ObsoleteCoroutinesApi : kotlin/Annotati
}
open class <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuationImpl : kotlinx.coroutines.internal/CoroutineStackFrame, kotlinx.coroutines/CancellableContinuation<#A>, kotlinx.coroutines/DispatchedTask<#A>, kotlinx.coroutines/Waiter { // kotlinx.coroutines/CancellableContinuationImpl|null[0]
constructor <init>(kotlin.coroutines/Continuation<#A>, kotlin/Int) // kotlinx.coroutines/CancellableContinuationImpl.<init>|<init>(kotlin.coroutines.Continuation<1:0>;kotlin.Int){}[0]
final fun <#A1: kotlin/Any?> callOnCancellation(kotlin/Function2<kotlin/Throwable, #A1, kotlin/Unit>, kotlin/Throwable, #A1) // kotlinx.coroutines/CancellableContinuationImpl.callOnCancellation|callOnCancellation(kotlin.Function2<kotlin.Throwable,0:0,kotlin.Unit>;kotlin.Throwable;0:0){0§<kotlin.Any?>}[0]
final fun callCancelHandler(kotlinx.coroutines/CancelHandler, kotlin/Throwable?) // kotlinx.coroutines/CancellableContinuationImpl.callCancelHandler|callCancelHandler(kotlinx.coroutines.CancelHandler;kotlin.Throwable?){}[0]
final fun callOnCancellation(kotlin/Function1<kotlin/Throwable, kotlin/Unit>, kotlin/Throwable) // kotlinx.coroutines/CancellableContinuationImpl.callOnCancellation|callOnCancellation(kotlin.Function1<kotlin.Throwable,kotlin.Unit>;kotlin.Throwable){}[0]
final fun getResult(): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.getResult|getResult(){}[0]
open fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatched(#A) // kotlinx.coroutines/CancellableContinuationImpl.resumeUndispatched|resumeUndispatched@kotlinx.coroutines.CoroutineDispatcher(1:0){}[0]
open fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatchedWithException(kotlin/Throwable) // kotlinx.coroutines/CancellableContinuationImpl.resumeUndispatchedWithException|resumeUndispatchedWithException@kotlinx.coroutines.CoroutineDispatcher(kotlin.Throwable){}[0]
open fun <#A1: #A> resume(#A1, kotlin/Function2<kotlin/Throwable, #A1, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuationImpl.resume|resume(0:0;kotlin.Function2<kotlin.Throwable,0:0,kotlin.Unit>?){0§<1:0>}[0]
open fun <#A1: #A> tryResume(#A1, kotlin/Any?, kotlin/Function2<kotlin/Throwable, #A1, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(0:0;kotlin.Any?;kotlin.Function2<kotlin.Throwable,0:0,kotlin.Unit>?){0§<1:0>}[0]
open fun cancel(kotlin/Throwable?): kotlin/Boolean // kotlinx.coroutines/CancellableContinuationImpl.cancel|cancel(kotlin.Throwable?){}[0]
open fun completeResume(kotlin/Any) // kotlinx.coroutines/CancellableContinuationImpl.completeResume|completeResume(kotlin.Any){}[0]
open fun getContinuationCancellationCause(kotlinx.coroutines/Job): kotlin/Throwable // kotlinx.coroutines/CancellableContinuationImpl.getContinuationCancellationCause|getContinuationCancellationCause(kotlinx.coroutines.Job){}[0]
Expand All @@ -790,7 +793,6 @@ open class <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuationImpl :
open fun resumeWith(kotlin/Result<#A>) // kotlinx.coroutines/CancellableContinuationImpl.resumeWith|resumeWith(kotlin.Result<1:0>){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/CancellableContinuationImpl.toString|toString(){}[0]
open fun tryResume(#A, kotlin/Any?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(1:0;kotlin.Any?){}[0]
open fun tryResume(#A, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(1:0;kotlin.Any?;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
open fun tryResumeWithException(kotlin/Throwable): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResumeWithException|tryResumeWithException(kotlin.Throwable){}[0]
open val callerFrame // kotlinx.coroutines/CancellableContinuationImpl.callerFrame|{}callerFrame[0]
open fun <get-callerFrame>(): kotlinx.coroutines.internal/CoroutineStackFrame? // kotlinx.coroutines/CancellableContinuationImpl.callerFrame.<get-callerFrame>|<get-callerFrame>(){}[0]
Expand Down
Expand Up @@ -83,7 +83,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
public fun <R: T> tryResume(value: R, idempotent: Any?, onCancellation: ((cause: Throwable, value: R) -> Unit)?): Any?

/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
Expand Down Expand Up @@ -198,6 +198,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
*/
@ExperimentalCoroutinesApi // since 1.2.0
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)

public fun <R: T> resume(value: R, onCancellation: ((cause: Throwable, value: R) -> Unit)?)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say we need to decide two things:

  • Whether we want to keep the previous overload (probably not, worth pointing out)
  • Whether we want to provide a third parameter to be CoroutineContext. It's not the selects performance I worry about, but the general applicability: consider I have a resume(resource, closeResourceFunIfCancelled). The close typically can fail and the exception can either be ignored, re-thrown (then we'll handle it as part of our last-ditch mechanism) or report it to the application-specific log. Whether it needs a CoroutineContext for that is an open question, I'm not really sure (and this decision does not block anything really)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more: whether we want to break source compatibility in most cases or almost none of them. The latter can be achieved with onCancellation: ((R).(cause: Throwable) -> Unit)? and nicely supports the pattern cont.resume(response) { close() }, but it can also silently change the semantics of existing code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of historical context: most likely, we haven't provided any value because the pattern we expected this to be used is kind of the following:

suspend fun Integration.await() = suspendCancellableCoroutine { cont ->
     val callback = this.createCallback()
     callback.then(cont) // invokes resume(). Can do resume(value, this) to save an allocation
     cont.invokeOnCancellation(callback) 
}

Though, under closer inspection, you might notice that the same signature is used twice in different contexts and it doesn't really work

}

/**
Expand Down
47 changes: 25 additions & 22 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Expand Up @@ -146,7 +146,7 @@ internal open class CancellableContinuationImpl<in T>(
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
if (state is CompletedContinuation && state.idempotentResume != null) {
if (state is CompletedContinuation<*> && state.idempotentResume != null) {
// Cannot reuse continuation that was resumed with idempotent marker
detachChild()
return false
Expand All @@ -169,7 +169,7 @@ internal open class CancellableContinuationImpl<in T>(
when (state) {
is NotCompleted -> error("Not completed")
is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
check(!state.cancelled) { "Must be called at most once" }
val update = state.copy(cancelCause = cause)
if (_state.compareAndSet(state, update)) {
Expand Down Expand Up @@ -243,9 +243,9 @@ internal open class CancellableContinuationImpl<in T>(
callCancelHandlerSafely { segment.onCancellation(index, cause, context) }
}

fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
fun <R> callOnCancellation(onCancellation: (cause: Throwable, value: R) -> Unit, cause: Throwable, value: R) {
try {
onCancellation.invoke(cause)
onCancellation.invoke(cause, value)
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
Expand Down Expand Up @@ -354,6 +354,9 @@ internal open class CancellableContinuationImpl<in T>(
resumeImpl(result.toState(this), resumeMode)

override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
resumeImpl(value, resumeMode, onCancellation?.let { { cause, _ -> onCancellation(cause) } })

override fun <R: T> resume(value: R, onCancellation: ((cause: Throwable, value: R) -> Unit)?) =
resumeImpl(value, resumeMode, onCancellation)

/**
Expand Down Expand Up @@ -415,7 +418,7 @@ internal open class CancellableContinuationImpl<in T>(
}
return
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
/*
* Continuation was already completed, and might already have cancel handler.
*/
Expand Down Expand Up @@ -456,11 +459,11 @@ internal open class CancellableContinuationImpl<in T>(
dispatch(mode)
}

private fun resumedState(
private fun <R> resumedState(
state: NotCompleted,
proposedUpdate: Any?,
proposedUpdate: R,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)?,
onCancellation: ((cause: Throwable, value: R) -> Unit)?,
idempotent: Any?
): Any? = when {
proposedUpdate is CompletedExceptionally -> {
Expand All @@ -476,10 +479,10 @@ internal open class CancellableContinuationImpl<in T>(
else -> proposedUpdate // simple case -- use the value directly
}

private fun resumeImpl(
proposedUpdate: Any?,
internal fun <R> resumeImpl(
proposedUpdate: R,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)? = null
onCancellation: ((cause: Throwable, value: R) -> Unit)? = null
) {
_state.loop { state ->
when (state) {
Expand All @@ -498,7 +501,7 @@ internal open class CancellableContinuationImpl<in T>(
*/
if (state.makeResumed()) { // check if trying to resume one (otherwise error)
// call onCancellation
onCancellation?.let { callOnCancellation(it, state.cause) }
onCancellation?.let { callOnCancellation(it, state.cause, proposedUpdate) }
return // done
}
}
Expand All @@ -511,10 +514,10 @@ internal open class CancellableContinuationImpl<in T>(
* Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
* Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
*/
private fun tryResumeImpl(
proposedUpdate: Any?,
private fun <R> tryResumeImpl(
proposedUpdate: R,
idempotent: Any?,
onCancellation: ((cause: Throwable) -> Unit)?
onCancellation: ((cause: Throwable, value: R) -> Unit)?
): Symbol? {
_state.loop { state ->
when (state) {
Expand All @@ -524,7 +527,7 @@ internal open class CancellableContinuationImpl<in T>(
detachChildIfNonResuable()
return RESUME_TOKEN
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
return if (idempotent != null && state.idempotentResume === idempotent) {
assert { state.result == proposedUpdate } // "Non-idempotent resume"
RESUME_TOKEN // resumed with the same token -- ok
Expand Down Expand Up @@ -560,7 +563,7 @@ internal open class CancellableContinuationImpl<in T>(
override fun tryResume(value: T, idempotent: Any?): Any? =
tryResumeImpl(value, idempotent, onCancellation = null)

override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
override fun <R: T> tryResume(value: R, idempotent: Any?, onCancellation: ((cause: Throwable, value: R) -> Unit)?): Any? =
tryResumeImpl(value, idempotent, onCancellation)

override fun tryResumeWithException(exception: Throwable): Any? =
Expand All @@ -585,7 +588,7 @@ internal open class CancellableContinuationImpl<in T>(
@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
when (state) {
is CompletedContinuation -> state.result as T
is CompletedContinuation<*> -> state.result as T
else -> state as T
}

Expand Down Expand Up @@ -652,18 +655,18 @@ internal interface CancelHandler : NotCompleted {
}

// Completed with additional metadata
private data class CompletedContinuation(
@JvmField val result: Any?,
private data class CompletedContinuation<R>(
@JvmField val result: R,
@JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
@JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
@JvmField val onCancellation: ((cause: Throwable, value: R) -> Unit)? = null, // installed via resume block
@JvmField val idempotentResume: Any? = null,
@JvmField val cancelCause: Throwable? = null
) {
val cancelled: Boolean get() = cancelCause != null

fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
cancelHandler?.let { cont.callCancelHandler(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, cause, result) }
}
}

Expand Down