Skip to content

Commit

Permalink
Add and enhance stress tests for attaching completion handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Apr 9, 2024
1 parent 7926aa2 commit 4858fc5
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 8 deletions.
37 changes: 30 additions & 7 deletions kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt
@@ -1,27 +1,36 @@
package kotlinx.coroutines

import kotlinx.coroutines.testing.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.test.*

/**
* Testing the procedure of attaching a child to the parent job.
*/
class JobChildStressTest : TestBase() {
private val N_ITERATIONS = 10_000 * stressTestMultiplier
private val pool = newFixedThreadPoolContext(3, "JobChildStressTest")

@After
@AfterTest
fun tearDown() {
pool.close()
}

/**
* Perform concurrent launch of a child job & cancellation of the explicit parent job
* Tests attaching a child while the parent is trying to finalize its state.
*
* Checks the following interleavings:
* - A child attaches before the parent is cancelled.
* - A child attaches after the parent is cancelled, but before the parent notifies anyone about it.
* - A child attaches after the parent notifies the children about being cancelled,
* but before it starts waiting for its children.
* - A child attempts to attach after the parent stops waiting for its children,
* which immediately cancels the child.
*/
@Test
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
fun testChild() = runTest {
fun testChildAttachmentRacingWithCancellation() = runTest {
val barrier = CyclicBarrier(3)
repeat(N_ITERATIONS) {
var wasLaunched = false
Expand All @@ -30,7 +39,7 @@ class JobChildStressTest : TestBase() {
unhandledException = ex
}
val scope = CoroutineScope(pool + handler)
val parent = CompletableDeferred<Unit>()
val parent = createCompletableDeferredForTesting(it)
// concurrent child launcher
val launcher = scope.launch {
barrier.await()
Expand All @@ -56,13 +65,27 @@ class JobChildStressTest : TestBase() {
}
}

/**
* Tests attaching a child while the parent is waiting for the last child job to complete.
*
* Checks the following interleavings:
* - A child attaches while the parent is already completing, but is waiting for its children.
* - A child attempts to attach after the parent stops waiting for its children,
* which immediately cancels the child.
*/
@Test
fun testFailingChildIsAddedWhenJobFinalizesItsState() {
fun testChildAttachmentRacingWithLastChildCompletion() {
// All exceptions should get aggregated here
repeat(N_ITERATIONS) {
runBlocking {
val rogueJob = AtomicReference<Job?>()
/** not using [createCompletableDeferredForTesting] because we don't need extra children. */
val deferred = CompletableDeferred<Unit>()
// optionally, add a completion handler to the parent job, so that the child tries to enter a list with
// multiple elements, not just one.
if (it.mod(2) == 0) {
deferred.invokeOnCompletion { }
}
launch(pool + deferred) {
deferred.complete(Unit) // Transition deferred into "completing" state waiting for current child
// **Asynchronously** submit task that launches a child so it races with completion
Expand Down
Expand Up @@ -30,6 +30,9 @@ class JobHandlersUpgradeStressTest : TestBase() {
val state = atomic(0)
}

/**
* Tests handlers not being invoked more than once.
*/
@Test
fun testStress() {
println("--- JobHandlersUpgradeStressTest")
Expand Down Expand Up @@ -91,4 +94,4 @@ class JobHandlersUpgradeStressTest : TestBase() {
println(" Fired handler ${fired.value} times")

}
}
}
192 changes: 192 additions & 0 deletions kotlinx-coroutines-core/jvm/test/JobOnCompletionStressTest.kt
@@ -0,0 +1,192 @@
package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlinx.coroutines.testing.*
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.*
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

class JobOnCompletionStressTest: TestBase() {
private val N_ITERATIONS = 10_000 * stressTestMultiplier
private val pool = newFixedThreadPoolContext(2, "JobOnCompletionStressTest")

private val completionHandlerSeesCompletedParent = AtomicBoolean(false)
private val completionHandlerSeesCancelledParent = AtomicBoolean(false)
private val encounteredException = AtomicReference<Throwable?>(null)

@AfterTest
fun tearDown() {
pool.close()
}

@Test
fun testOnCompletionRacingWithCompletion() = runTest {
testHandlerRacingWithCancellation(
onCancelling = false,
invokeImmediately = true,
parentCompletion = { complete(Unit) }
) {
assertNull(encounteredException.get())
assertTrue(completionHandlerSeesCompletedParent.get())
assertFalse(completionHandlerSeesCancelledParent.get())
}
}

@Test
fun testOnCompletionRacingWithCancellation() = runTest {
testHandlerRacingWithCancellation(
onCancelling = false,
invokeImmediately = true,
parentCompletion = { completeExceptionally(TestException()) }
) {
assertIs<TestException>(encounteredException.get())
assertTrue(completionHandlerSeesCompletedParent.get())
assertTrue(completionHandlerSeesCancelledParent.get())
}
}

@Test
fun testOnCancellingRacingWithCompletion() = runTest {
testHandlerRacingWithCancellation(
onCancelling = true,
invokeImmediately = true,
parentCompletion = { complete(Unit) }
) {
assertNull(encounteredException.get())
assertTrue(completionHandlerSeesCompletedParent.get())
assertFalse(completionHandlerSeesCancelledParent.get())
}
}

@Test
fun testOnCancellingRacingWithCancellation() = runTest {
testHandlerRacingWithCancellation(
onCancelling = true,
invokeImmediately = true,
parentCompletion = { completeExceptionally(TestException()) }
) {
assertIs<TestException>(encounteredException.get())
assertTrue(completionHandlerSeesCancelledParent.get())
}
}

@Test
fun testNonImmediateOnCompletionRacingWithCompletion() = runTest {
testHandlerRacingWithCancellation(
onCancelling = false,
invokeImmediately = false,
parentCompletion = { complete(Unit) }
) {
assertNull(encounteredException.get())
assertTrue(completionHandlerSeesCompletedParent.get())
assertFalse(completionHandlerSeesCancelledParent.get())
}
}

@Test
fun testNonImmediateOnCompletionRacingWithCancellation() = runTest {
testHandlerRacingWithCancellation(
onCancelling = false,
invokeImmediately = false,
parentCompletion = { completeExceptionally(TestException()) }
) {
assertIs<TestException>(encounteredException.get())
assertTrue(completionHandlerSeesCompletedParent.get())
assertTrue(completionHandlerSeesCancelledParent.get())
}
}

@Test
fun testNonImmediateOnCancellingRacingWithCompletion() = runTest {
testHandlerRacingWithCancellation(
onCancelling = true,
invokeImmediately = false,
parentCompletion = { complete(Unit) }
) {
assertNull(encounteredException.get())
assertTrue(completionHandlerSeesCompletedParent.get())
assertFalse(completionHandlerSeesCancelledParent.get())
}
}

@Test
fun testNonImmediateOnCancellingRacingWithCancellation() = runTest {
testHandlerRacingWithCancellation(
onCancelling = true,
invokeImmediately = false,
parentCompletion = { completeExceptionally(TestException()) }
) {
assertIs<TestException>(encounteredException.get())
assertTrue(completionHandlerSeesCancelledParent.get())
}
}

private suspend fun testHandlerRacingWithCancellation(
onCancelling: Boolean,
invokeImmediately: Boolean,
parentCompletion: CompletableDeferred<Unit>.() -> Unit,
validate: () -> Unit,
) {
repeat(N_ITERATIONS) {
val entered = Channel<Unit>(1)
completionHandlerSeesCompletedParent.set(false)
completionHandlerSeesCancelledParent.set(false)
encounteredException.set(null)
val parent = createCompletableDeferredForTesting(it)
val barrier = CyclicBarrier(2)
val handlerInstallJob = coroutineScope {
launch(pool) {
barrier.await()
parent.parentCompletion()
}
async(pool) {
barrier.await()
parent.invokeOnCompletion(
onCancelling = onCancelling,
invokeImmediately = invokeImmediately,
) { exception ->
encounteredException.set(exception)
completionHandlerSeesCompletedParent.set(parent.isCompleted)
completionHandlerSeesCancelledParent.set(parent.isCancelled)
entered.trySend(Unit)
}
}
}
if (invokeImmediately || handlerInstallJob.getCompleted() !== NonDisposableHandle) {
withTimeout(1.seconds) {
entered.receive()
}
try {
validate()
} catch (e: Throwable) {
println("Iteration $it failed")
println("invokeOnCompletion returned ${handlerInstallJob.getCompleted()}")
throw e
}
} else {
assertTrue(entered.isEmpty)
}
}
}
}

/**
* Creates a [CompletableDeferred], optionally adding completion handlers and/or other children to the job depending
* on [iteration].
* The purpose is to test not just attaching completion handlers to empty or one-element lists (see the [JobSupport]
* implementation for details on what this means), but also to lists with multiple elements.
*/
fun createCompletableDeferredForTesting(iteration: Int): CompletableDeferred<Unit> {
val parent = CompletableDeferred<Unit>()
/* We optionally add completion handlers and/or other children to the parent job
to test the scenarios where a child is placed into an empty list, a single-element list,
or a list with multiple elements. */
if (iteration.mod(2) == 0) {
parent.invokeOnCompletion { }
}
if (iteration.mod(3) == 0) {
GlobalScope.launch(parent) { }
}
return parent
}

0 comments on commit 4858fc5

Please sign in to comment.