Skip to content

Commit

Permalink
Mark the coroutine started with UNDISPATCHED as running (#4077)
Browse files Browse the repository at this point in the history
Fixes #4058

Additional small fix: the coroutine context injected by `probeCoroutineCreated` is used for `CoroutineStart.UNDISPATCHED` even before the first suspension.
  • Loading branch information
dkhalanskyjb committed Apr 3, 2024
1 parent 617f56b commit 7bdc901
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 28 deletions.
Expand Up @@ -3,3 +3,5 @@ package kotlinx.coroutines.internal
import kotlin.coroutines.*

internal expect inline fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T>

internal expect inline fun <T> probeCoroutineResumed(completion: Continuation<T>): Unit
31 changes: 6 additions & 25 deletions kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt
Expand Up @@ -5,39 +5,20 @@ import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
* Use this function to restart a coroutine directly from inside of [suspendCoroutine],
* when the code is already in the context of this coroutine.
* It does not use [ContinuationInterceptor] and does not update the context of the current thread.
*/
internal fun <T> (suspend () -> T).startCoroutineUnintercepted(completion: Continuation<T>) {
startDirect(completion) { actualCompletion ->
startCoroutineUninterceptedOrReturn(actualCompletion)
}
}

/**
* Use this function to start a new coroutine in [CoroutineStart.UNDISPATCHED] mode &mdash;
* immediately execute the coroutine in the current thread until the next suspension.
* It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine.
*/
internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
startDirect(completion) { actualCompletion ->
withCoroutineContext(completion.context, null) {
startCoroutineUninterceptedOrReturn(receiver, actualCompletion)
}
}
}

/**
* Starts the given [block] immediately in the current stack-frame until the first suspension point.
* This method supports debug probes and thus can intercept completion, thus completion is provided
* as the parameter of [block].
*/
private inline fun <T> startDirect(completion: Continuation<T>, block: (Continuation<T>) -> Any?) {
val actualCompletion = probeCoroutineCreated(completion)
val value = try {
block(actualCompletion)
/* The code below is started immediately in the current stack-frame
* and runs until the first suspension point. */
withCoroutineContext(actualCompletion.context, null) {
probeCoroutineResumed(actualCompletion)
startCoroutineUninterceptedOrReturn(receiver, actualCompletion)
}
} catch (e: Throwable) {
actualCompletion.resumeWithException(e)
return
Expand Down
24 changes: 23 additions & 1 deletion kotlinx-coroutines-core/common/test/EmptyContext.kt
@@ -1,8 +1,30 @@
package kotlinx.coroutines

import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.internal.probeCoroutineCreated
import kotlinx.coroutines.internal.probeCoroutineResumed
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

suspend fun <T> withEmptyContext(block: suspend () -> T): T = suspendCoroutine { cont ->
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { cont.resumeWith(it) })
}

/**
* Use this function to restart a coroutine directly from inside of [suspendCoroutine],
* when the code is already in the context of this coroutine.
* It does not use [ContinuationInterceptor] and does not update the context of the current thread.
*/
fun <T> (suspend () -> T).startCoroutineUnintercepted(completion: Continuation<T>) {
val actualCompletion = probeCoroutineCreated(completion)
val value = try {
probeCoroutineResumed(actualCompletion)
startCoroutineUninterceptedOrReturn(actualCompletion)
} catch (e: Throwable) {
actualCompletion.resumeWithException(e)
return
}
if (value !== COROUTINE_SUSPENDED) {
@Suppress("UNCHECKED_CAST")
actualCompletion.resume(value as T)
}
}
Expand Up @@ -4,3 +4,6 @@ import kotlin.coroutines.*

@Suppress("NOTHING_TO_INLINE")
internal actual inline fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> = completion

@Suppress("NOTHING_TO_INLINE")
internal actual inline fun <T> probeCoroutineResumed(completion: Continuation<T>) { }
8 changes: 6 additions & 2 deletions kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt
Expand Up @@ -3,6 +3,10 @@
package kotlinx.coroutines.internal

import kotlin.coroutines.*
import kotlin.coroutines.jvm.internal.probeCoroutineCreated as probe

internal actual inline fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> = probe(completion)
internal actual inline fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> =
kotlin.coroutines.jvm.internal.probeCoroutineCreated(completion)

internal actual inline fun <T> probeCoroutineResumed(completion: Continuation<T>) {
kotlinx.coroutines.debug.internal.probeCoroutineResumed(completion)
}
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/native/src/internal/ProbesSupport.kt
Expand Up @@ -4,3 +4,6 @@ import kotlin.coroutines.*

@Suppress("NOTHING_TO_INLINE")
internal actual inline fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> = completion

@Suppress("NOTHING_TO_INLINE")
internal actual inline fun <T> probeCoroutineResumed(completion: Continuation<T>) { }
20 changes: 20 additions & 0 deletions kotlinx-coroutines-debug/test/CoroutinesDumpTest.kt
Expand Up @@ -106,6 +106,26 @@ class CoroutinesDumpTest : DebugTestBase() {
}
}

/**
* Tests that a coroutine started with [CoroutineStart.UNDISPATCHED] is considered running.
*/
@Test
fun testUndispatchedCoroutineIsRunning() = runBlocking {
val job = launch(Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) { // or launch(Dispatchers.Unconfined)
verifyDump(
"Coroutine \"coroutine#1\":StandaloneCoroutine{Active}@1e4a7dd4, state: RUNNING\n",
ignoredCoroutine = "BlockingCoroutine"
)
delay(Long.MAX_VALUE)
}
verifyDump(
"Coroutine \"coroutine#1\":StandaloneCoroutine{Active}@1e4a7dd4, state: SUSPENDED\n",
ignoredCoroutine = "BlockingCoroutine"
) {
job.cancel()
}
}

@Test
fun testCreationStackTrace() = runBlocking {
val deferred = async(Dispatchers.IO) {
Expand Down

0 comments on commit 7bdc901

Please sign in to comment.