Skip to content

Commit

Permalink
Add a scope for launching background work in tests (#3348)
Browse files Browse the repository at this point in the history
Fixes #3287
  • Loading branch information
dkhalanskyjb committed Jul 12, 2022
1 parent 562902b commit 143bdfa
Show file tree
Hide file tree
Showing 19 changed files with 738 additions and 60 deletions.
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -381,6 +381,12 @@ public final class kotlinx/coroutines/Job$DefaultImpls {
public final class kotlinx/coroutines/Job$Key : kotlin/coroutines/CoroutineContext$Key {
}

public class kotlinx/coroutines/JobImpl : kotlinx/coroutines/JobSupport, kotlinx/coroutines/CompletableJob {
public fun <init> (Lkotlinx/coroutines/Job;)V
public fun complete ()Z
public fun completeExceptionally (Ljava/lang/Throwable;)Z
}

public final class kotlinx/coroutines/JobKt {
public static final fun Job (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/CompletableJob;
public static final synthetic fun Job (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -1312,6 +1312,7 @@ private class Empty(override val isActive: Boolean) : Incomplete {
override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}

@PublishedApi // for a custom job in the test module
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
init { initParentJob(parent) }
override val onCancelComplete get() = true
Expand Down
Expand Up @@ -40,7 +40,7 @@ internal expect suspend inline fun recoverAndThrow(exception: Throwable): Nothin
* The opposite of [recoverStackTrace].
* It is guaranteed that `unwrap(recoverStackTrace(e)) === e`
*/
@PublishedApi // only published for the multiplatform tests in our own code
@PublishedApi // published for the multiplatform implementation of kotlinx-coroutines-test
internal expect fun <E: Throwable> unwrap(exception: E): E

internal expect class StackTraceElement
Expand Down
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt
Expand Up @@ -37,6 +37,16 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe
_size.value = 0
}

public fun find(
predicate: (value: T) -> Boolean
): T? = synchronized(this) block@{
for (i in 0 until size) {
val value = a?.get(i)!!
if (predicate(value)) return@block value
}
null
}

public fun peek(): T? = synchronized(this) { firstImpl() }

public fun removeFirstOrNull(): T? = synchronized(this) {
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-test/api/kotlinx-coroutines-test.api
Expand Up @@ -105,6 +105,7 @@ public final class kotlinx/coroutines/test/TestDispatchers {
}

public abstract interface class kotlinx/coroutines/test/TestScope : kotlinx/coroutines/CoroutineScope {
public abstract fun getBackgroundScope ()Lkotlinx/coroutines/CoroutineScope;
public abstract fun getTestScheduler ()Lkotlinx/coroutines/test/TestCoroutineScheduler;
}

Expand Down
60 changes: 49 additions & 11 deletions kotlinx-coroutines-test/common/src/TestBuilders.kt
Expand Up @@ -164,15 +164,19 @@ public fun TestScope.runTest(
): TestResult = asSpecificImplementation().let {
it.enter()
createTestResult {
runTestCoroutine(it, dispatchTimeoutMs, TestScopeImpl::tryGetCompletionCause, testBody) { it.leave() }
runTestCoroutine(it, dispatchTimeoutMs, TestScopeImpl::tryGetCompletionCause, testBody) {
backgroundScope.cancel()
testScheduler.advanceUntilIdleOr { false }
it.leave()
}
}
}

/**
* Runs [testProcedure], creating a [TestResult].
*/
@Suppress("NO_ACTUAL_FOR_EXPECT") // actually suppresses `TestResult`
internal expect fun createTestResult(testProcedure: suspend () -> Unit): TestResult
internal expect fun createTestResult(testProcedure: suspend CoroutineScope.() -> Unit): TestResult

/** A coroutine context element indicating that the coroutine is running inside `runTest`. */
internal object RunningInRunTest : CoroutineContext.Key<RunningInRunTest>, CoroutineContext.Element {
Expand All @@ -195,7 +199,7 @@ internal const val DEFAULT_DISPATCH_TIMEOUT_MS = 60_000L
* The [cleanup] procedure may either throw [UncompletedCoroutinesError] to denote that child coroutines were leaked, or
* return a list of uncaught exceptions that should be reported at the end of the test.
*/
internal suspend fun <T: AbstractCoroutine<Unit>> runTestCoroutine(
internal suspend fun <T: AbstractCoroutine<Unit>> CoroutineScope.runTestCoroutine(
coroutine: T,
dispatchTimeoutMs: Long,
tryGetCompletionCause: T.() -> Throwable?,
Expand All @@ -207,6 +211,27 @@ internal suspend fun <T: AbstractCoroutine<Unit>> runTestCoroutine(
coroutine.start(CoroutineStart.UNDISPATCHED, coroutine) {
testBody()
}
/**
* The general procedure here is as follows:
* 1. Try running the work that the scheduler knows about, both background and foreground.
*
* 2. Wait until we run out of foreground work to do. This could mean one of the following:
* * The main coroutine is already completed. This is checked separately; then we leave the procedure.
* * It's switched to another dispatcher that doesn't know about the [TestCoroutineScheduler].
* * Generally, it's waiting for something external (like a network request, or just an arbitrary callback).
* * The test simply hanged.
* * The main coroutine is waiting for some background work.
*
* 3. We await progress from things that are not the code under test:
* the background work that the scheduler knows about, the external callbacks,
* the work on dispatchers not linked to the scheduler, etc.
*
* When we observe that the code under test can proceed, we go to step 1 again.
* If there is no activity for [dispatchTimeoutMs] milliseconds, we consider the test to have hanged.
*
* The background work is not running on a dedicated thread.
* Instead, the test thread itself is used, by spawning a separate coroutine.
*/
var completed = false
while (!completed) {
scheduler.advanceUntilIdle()
Expand All @@ -216,16 +241,29 @@ internal suspend fun <T: AbstractCoroutine<Unit>> runTestCoroutine(
completed = true
continue
}
select<Unit> {
coroutine.onJoin {
completed = true
}
scheduler.onDispatchEvent {
// we received knowledge that `scheduler` observed a dispatch event, so we reset the timeout
// in case progress depends on some background work, we need to keep spinning it.
val backgroundWorkRunner = launch(CoroutineName("background work runner")) {
while (true) {
scheduler.tryRunNextTaskUnless { !isActive }
// yield so that the `select` below has a chance to check if its conditions are fulfilled
yield()
}
onTimeout(dispatchTimeoutMs) {
handleTimeout(coroutine, dispatchTimeoutMs, tryGetCompletionCause, cleanup)
}
try {
select<Unit> {
coroutine.onJoin {
// observe that someone completed the test coroutine and leave without waiting for the timeout
completed = true
}
scheduler.onDispatchEvent {
// we received knowledge that `scheduler` observed a dispatch event, so we reset the timeout
}
onTimeout(dispatchTimeoutMs) {
handleTimeout(coroutine, dispatchTimeoutMs, tryGetCompletionCause, cleanup)
}
}
} finally {
backgroundWorkRunner.cancelAndJoin()
}
}
coroutine.getCompletionExceptionOrNull()?.let { exception ->
Expand Down
Expand Up @@ -96,7 +96,7 @@ private class UnconfinedTestDispatcherImpl(
@Suppress("INVISIBLE_MEMBER")
override fun dispatch(context: CoroutineContext, block: Runnable) {
checkSchedulerInContext(scheduler, context)
scheduler.sendDispatchEvent()
scheduler.sendDispatchEvent(context)

/** copy-pasted from [kotlinx.coroutines.Unconfined.dispatch] */
/** It can only be called by the [yield] function. See also code of [yield] function. */
Expand Down Expand Up @@ -151,8 +151,7 @@ private class StandardTestDispatcherImpl(
) : TestDispatcher() {

override fun dispatch(context: CoroutineContext, block: Runnable) {
checkSchedulerInContext(scheduler, context)
scheduler.registerEvent(this, 0, block) { false }
scheduler.registerEvent(this, 0, block, context) { false }
}

override fun toString(): String = "${name ?: "StandardTestDispatcher"}[scheduler=$scheduler]"
Expand Down
62 changes: 42 additions & 20 deletions kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt
Expand Up @@ -62,17 +62,20 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
dispatcher: TestDispatcher,
timeDeltaMillis: Long,
marker: T,
context: CoroutineContext,
isCancelled: (T) -> Boolean
): DisposableHandle {
require(timeDeltaMillis >= 0) { "Attempted scheduling an event earlier in time (with the time delta $timeDeltaMillis)" }
checkSchedulerInContext(this, context)
val count = count.getAndIncrement()
val isForeground = context[BackgroundWork] === null
return synchronized(lock) {
val time = addClamping(currentTime, timeDeltaMillis)
val event = TestDispatchEvent(dispatcher, count, time, marker as Any) { isCancelled(marker) }
val event = TestDispatchEvent(dispatcher, count, time, marker as Any, isForeground) { isCancelled(marker) }
events.addLast(event)
/** can't be moved above: otherwise, [onDispatchEvent] could consume the token sent here before there's
* actually anything in the event queue. */
sendDispatchEvent()
sendDispatchEvent(context)
DisposableHandle {
synchronized(lock) {
events.remove(event)
Expand All @@ -82,10 +85,12 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
}

/**
* Runs the next enqueued task, advancing the virtual time to the time of its scheduled awakening.
* Runs the next enqueued task, advancing the virtual time to the time of its scheduled awakening,
* unless [condition] holds.
*/
private fun tryRunNextTask(): Boolean {
internal fun tryRunNextTaskUnless(condition: () -> Boolean): Boolean {
val event = synchronized(lock) {
if (condition()) return false
val event = events.removeFirstOrNull() ?: return false
if (currentTime > event.time)
currentTimeAheadOfEvents()
Expand All @@ -105,9 +110,15 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
* functionality, query [currentTime] before and after the execution to achieve the same result.
*/
@ExperimentalCoroutinesApi
public fun advanceUntilIdle() {
while (!synchronized(lock) { events.isEmpty }) {
tryRunNextTask()
public fun advanceUntilIdle(): Unit = advanceUntilIdleOr { events.none(TestDispatchEvent<*>::isForeground) }

/**
* [condition]: guaranteed to be invoked under the lock.
*/
internal fun advanceUntilIdleOr(condition: () -> Boolean) {
while (true) {
if (!tryRunNextTaskUnless(condition))
return
}
}

Expand Down Expand Up @@ -169,24 +180,19 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
/**
* Checks that the only tasks remaining in the scheduler are cancelled.
*/
internal fun isIdle(strict: Boolean = true): Boolean {
internal fun isIdle(strict: Boolean = true): Boolean =
synchronized(lock) {
if (strict)
return events.isEmpty
// TODO: also completely empties the queue, as there's no nondestructive way to iterate over [ThreadSafeHeap]
val presentEvents = mutableListOf<TestDispatchEvent<*>>()
while (true) {
presentEvents += events.removeFirstOrNull() ?: break
}
return presentEvents.all { it.isCancelled() }
if (strict) events.isEmpty else events.none { !it.isCancelled() }
}
}

/**
* Notifies this scheduler about a dispatch event.
*
* [context] is the context in which the task will be dispatched.
*/
internal fun sendDispatchEvent() {
dispatchEvents.trySend(Unit)
internal fun sendDispatchEvent(context: CoroutineContext) {
if (context[BackgroundWork] !== BackgroundWork)
dispatchEvents.trySend(Unit)
}

/**
Expand Down Expand Up @@ -216,6 +222,8 @@ private class TestDispatchEvent<T>(
private val count: Long,
@JvmField val time: Long,
@JvmField val marker: T,
@JvmField val isForeground: Boolean,
// TODO: remove once the deprecated API is gone
@JvmField val isCancelled: () -> Boolean
) : Comparable<TestDispatchEvent<*>>, ThreadSafeHeapNode {
override var heap: ThreadSafeHeap<*>? = null
Expand All @@ -224,7 +232,7 @@ private class TestDispatchEvent<T>(
override fun compareTo(other: TestDispatchEvent<*>) =
compareValuesBy(this, other, TestDispatchEvent<*>::time, TestDispatchEvent<*>::count)

override fun toString() = "TestDispatchEvent(time=$time, dispatcher=$dispatcher)"
override fun toString() = "TestDispatchEvent(time=$time, dispatcher=$dispatcher${if (isForeground) "" else ", background"})"
}

// works with positive `a`, `b`
Expand All @@ -238,3 +246,17 @@ internal fun checkSchedulerInContext(scheduler: TestCoroutineScheduler, context:
}
}
}

/**
* A coroutine context key denoting that the work is to be executed in the background.
* @see [TestScope.backgroundScope]
*/
internal object BackgroundWork : CoroutineContext.Key<BackgroundWork>, CoroutineContext.Element {
override val key: CoroutineContext.Key<*>
get() = this

override fun toString(): String = "BackgroundWork"
}

private fun<T> ThreadSafeHeap<T>.none(predicate: (T) -> Boolean) where T: ThreadSafeHeapNode, T: Comparable<T> =
find(predicate) == null
13 changes: 5 additions & 8 deletions kotlinx-coroutines-test/common/src/TestDispatcher.kt
Expand Up @@ -10,14 +10,14 @@ import kotlin.jvm.*

/**
* A test dispatcher that can interface with a [TestCoroutineScheduler].
*
*
* The available implementations are:
* * [StandardTestDispatcher] is a dispatcher that places new tasks into a queue.
* * [UnconfinedTestDispatcher] is a dispatcher that behaves like [Dispatchers.Unconfined] while allowing to control
* the virtual time.
*/
@ExperimentalCoroutinesApi
public abstract class TestDispatcher internal constructor(): CoroutineDispatcher(), Delay {
public abstract class TestDispatcher internal constructor() : CoroutineDispatcher(), Delay {
/** The scheduler that this dispatcher is linked to. */
@ExperimentalCoroutinesApi
public abstract val scheduler: TestCoroutineScheduler
Expand All @@ -30,16 +30,13 @@ public abstract class TestDispatcher internal constructor(): CoroutineDispatcher

/** @suppress */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
checkSchedulerInContext(scheduler, continuation.context)
val timedRunnable = CancellableContinuationRunnable(continuation, this)
scheduler.registerEvent(this, timeMillis, timedRunnable, ::cancellableRunnableIsCancelled)
scheduler.registerEvent(this, timeMillis, timedRunnable, continuation.context, ::cancellableRunnableIsCancelled)
}

/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
checkSchedulerInContext(scheduler, context)
return scheduler.registerEvent(this, timeMillis, block) { false }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
scheduler.registerEvent(this, timeMillis, block, context) { false }
}

/**
Expand Down

0 comments on commit 143bdfa

Please sign in to comment.