Skip to content

Commit

Permalink
~run coroutines in two modes
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Oct 27, 2021
1 parent 5442a1c commit ed876b0
Show file tree
Hide file tree
Showing 27 changed files with 135 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public abstract class ChannelFlow<T>(

override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
println("In scope")
collector.emitAll(produceImpl(this))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package kotlinx.coroutines.internal

import kotlin.native.concurrent.*

/**
* Special kind of list intended to be used as collection of subscribers in `ArrayBroadcastChannel`
* On JVM it's CopyOnWriteList and on JS it's MutableList.
Expand All @@ -22,3 +24,5 @@ internal expect class ReentrantLock() {
internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T

internal expect fun <E> identitySet(expectedSize: Int): MutableSet<E>

internal expect val supportsMultithreading: Boolean
11 changes: 11 additions & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.test.*

Expand Down Expand Up @@ -48,6 +49,16 @@ public expect open class TestBase constructor() {
): TestResult
}

public fun TestBase.runMtTest(
expected: ((Throwable) -> Boolean)? = null,
unhandled: List<(Throwable) -> Boolean> = emptyList(),
block: suspend CoroutineScope.() -> Unit
): TestResult {
@Suppress("CAST_NEVER_SUCCEEDS")
if (!supportsMultithreading) return Unit as TestResult
return runTest(expected, unhandled, block)
}

public suspend inline fun hang(onCancellation: () -> Unit) {
try {
suspendCancellableCoroutine<Unit> { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ abstract class AbstractDispatcherConcurrencyTest : TestBase() {
public abstract val dispatcher: CoroutineDispatcher

@Test
fun testLaunchAndJoin() {
fun testLaunchAndJoin() = runMtTest {
expect(1)
var capturedMutableState = 0
val job = GlobalScope.launch(dispatcher) {
Expand All @@ -25,7 +25,7 @@ abstract class AbstractDispatcherConcurrencyTest : TestBase() {
}

@Test
fun testDispatcherIsActuallyMultithreaded() {
fun testDispatcherIsActuallyMultithreaded() = runMtTest {
val channel = Channel<Int>()
GlobalScope.launch(dispatcher) {
channel.send(42)
Expand All @@ -41,7 +41,7 @@ abstract class AbstractDispatcherConcurrencyTest : TestBase() {
}

@Test
fun testDelayInDefaultDispatcher() {
fun testDelayInDefaultDispatcher() = runMtTest {
expect(1)
val job = GlobalScope.launch(dispatcher) {
expect(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlin.test.*

class AtomicCancellationTest : TestBase() {
@Test
fun testSendCancellable() = runBlocking {
fun testSendCancellable() = runMtTest {
expect(1)
val channel = Channel<Int>()
val job = launch(start = CoroutineStart.UNDISPATCHED) {
Expand All @@ -26,7 +26,7 @@ class AtomicCancellationTest : TestBase() {
}

@Test
fun testSelectSendCancellable() = runBlocking {
fun testSelectSendCancellable() = runMtTest {
expect(1)
val channel = Channel<Int>()
val job = launch(start = CoroutineStart.UNDISPATCHED) {
Expand All @@ -47,7 +47,7 @@ class AtomicCancellationTest : TestBase() {
}

@Test
fun testReceiveCancellable() = runBlocking {
fun testReceiveCancellable() = runMtTest {
expect(1)
val channel = Channel<Int>()
val job = launch(start = CoroutineStart.UNDISPATCHED) {
Expand All @@ -63,7 +63,7 @@ class AtomicCancellationTest : TestBase() {
}

@Test
fun testSelectReceiveCancellable() = runBlocking {
fun testSelectReceiveCancellable() = runMtTest {
expect(1)
val channel = Channel<Int>()
val job = launch(start = CoroutineStart.UNDISPATCHED) {
Expand All @@ -85,7 +85,7 @@ class AtomicCancellationTest : TestBase() {
}

@Test
fun testSelectDeferredAwaitCancellable() = runBlocking {
fun testSelectDeferredAwaitCancellable() = runMtTest {
expect(1)
val deferred = async { // deferred, not yet complete
expect(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,24 @@
package kotlinx.coroutines

import kotlinx.coroutines.exceptions.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.test.*

class ConcurrentExceptionsStressTest : TestBase() {
private val nWorkers = 4
private val nRepeat = 1000 * stressTestMultiplier

private val workers = Array(nWorkers) { index ->
newSingleThreadContext("JobExceptionsStressTest-$index")
if (!supportsMultithreading) object : CloseableCoroutineDispatcher() {
override fun close() {
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
TODO("Not yet implemented")
}
}
else newSingleThreadContext("JobExceptionsStressTest-$index")
}

@AfterTest
Expand All @@ -23,7 +33,7 @@ class ConcurrentExceptionsStressTest : TestBase() {
}

@Test
fun testStress() = runTest {
fun testStress() = runMtTest {
repeat(nRepeat) {
testOnce()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
*/

import kotlinx.coroutines.*
import kotlin.test.*
import kotlin.coroutines.*
import kotlin.test.*

/**
* Test a race between job failure and join.
Expand All @@ -15,12 +15,12 @@ class JobStructuredJoinStressTest : TestBase() {
private val nRepeats = 10_000 * stressTestMultiplier

@Test
fun testStressRegularJoin() {
fun testStressRegularJoin() = runMtTest {
stress(Job::join)
}

@Test
fun testStressSuspendCancellable() {
fun testStressSuspendCancellable() = runMtTest {
stress { job ->
suspendCancellableCoroutine { cont ->
job.invokeOnCompletion { cont.resume(Unit) }
Expand All @@ -29,7 +29,7 @@ class JobStructuredJoinStressTest : TestBase() {
}

@Test
fun testStressSuspendCancellableReusable() {
fun testStressSuspendCancellableReusable() = runMtTest {
stress { job ->
suspendCancellableCoroutineReusable { cont ->
job.invokeOnCompletion { cont.resume(Unit) }
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlin.test.*
class RunBlockingTest : TestBase() {

@Test
fun testWithTimeoutBusyWait() = runBlocking {
fun testWithTimeoutBusyWait() = runMtTest {
val value = withTimeoutOrNull(10) {
while (isActive) {
// Busy wait
Expand Down Expand Up @@ -52,7 +52,7 @@ class RunBlockingTest : TestBase() {
}

@Test
fun testOtherDispatcher() {
fun testOtherDispatcher() = runMtTest {
expect(1)
val name = "RunBlockingTest.testOtherDispatcher"
val thread = newSingleThreadContext(name)
Expand All @@ -68,7 +68,7 @@ class RunBlockingTest : TestBase() {
}

@Test
fun testCancellation() {
fun testCancellation() = runMtTest {
val ctx = newSingleThreadContext("testCancellation")
val job = GlobalScope.launch {
runBlocking(coroutineContext + ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class StateFlowCommonStressTest : TestBase() {
private val state = MutableStateFlow<Long>(0)

@Test
fun testSingleEmitterAndCollector() = runTest {
fun testSingleEmitterAndCollector() = runMtTest {
var collected = 0L
val collector = launch(Dispatchers.Default) {
// collect, but abort and collect again after every 1000 values to stress allocation/deallocation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class StateFlowUpdateCommonTest : TestBase() {
@Test
fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } }

private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest {
private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runMtTest {
val flow = MutableStateFlow(0)
val j1 = launch(Dispatchers.Default) {
repeat(iterations / 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class BroadcastChannelSubStressTest: TestBase() {
private val receivedTotal = atomic(0L)

@Test
fun testStress() = runTest {
fun testStress() = runMtTest {
TestBroadcastChannelKind.values().forEach { kind ->
println("--- BroadcastChannelSubStressTest $kind")
val broadcast = kind.create<Long>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
private val dUndeliveredCnt = atomic(0)

@Test
fun testStress() = runTest {
fun testStress() = runMtTest {
repeat(repeatTimes) {
val channel = Channel<Int>(1) { dUndeliveredCnt.incrementAndGet() }
val j1 = launch(Dispatchers.Default) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
private val receivedTotal = atomic(0)

@Test
fun testStressNotify()= runBlocking {
fun testStressNotify()= runMtTest {
println("--- ConflatedBroadcastChannelNotifyStressTest")
val senders = List(nSenders) { senderId ->
launch(Dispatchers.Default + CoroutineName("Sender$senderId")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlin.test.*
class CombineStressTest : TestBase() {

@Test
fun testCancellation() = runTest {
fun testCancellation() = runMtTest {
withContext(Dispatchers.Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
flow {
expect(1)
Expand All @@ -26,7 +26,7 @@ class CombineStressTest : TestBase() {
}

@Test
public fun testFailure() = runTest {
public fun testFailure() = runMtTest {
val innerIterations = 100 * stressTestMultiplierSqrt
val outerIterations = 10 * stressTestMultiplierSqrt
withContext(Dispatchers.Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlin.test.*
class FlowCancellationTest : TestBase() {

@Test
fun testEmitIsCooperative() = runTest {
fun testEmitIsCooperative() = runMtTest {
val latch = Channel<Unit>(1)
val job = flow {
expect(1)
Expand All @@ -29,7 +29,7 @@ class FlowCancellationTest : TestBase() {
}

@Test
fun testIsActiveOnCurrentContext() = runTest {
fun testIsActiveOnCurrentContext() = runMtTest {
val latch = Channel<Unit>(1)
val job = flow<Unit> {
expect(1)
Expand All @@ -46,7 +46,7 @@ class FlowCancellationTest : TestBase() {
}

@Test
fun testFlowWithEmptyContext() = runTest {
fun testFlowWithEmptyContext() = runMtTest {
expect(1)
withEmptyContext {
val flow = flow {
Expand All @@ -60,4 +60,4 @@ class FlowCancellationTest : TestBase() {
}
finish(4)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class SelectChannelStressTest: TestBase() {
private val iterations = (if (isNative) 1_000 else 1_000_000) * stressTestMultiplier

@Test
fun testSelectSendResourceCleanupArrayChannel() = runTest {
fun testSelectSendResourceCleanupArrayChannel() = runMtTest {
val channel = Channel<Int>(1)
expect(1)
channel.send(-1) // fill the buffer, so all subsequent sends cannot proceed
Expand All @@ -29,7 +29,7 @@ class SelectChannelStressTest: TestBase() {
}

@Test
fun testSelectReceiveResourceCleanupArrayChannel() = runTest {
fun testSelectReceiveResourceCleanupArrayChannel() = runMtTest {
val channel = Channel<Int>(1)
expect(1)
repeat(iterations) { i ->
Expand All @@ -42,7 +42,7 @@ class SelectChannelStressTest: TestBase() {
}

@Test
fun testSelectSendResourceCleanupRendezvousChannel() = runTest {
fun testSelectSendResourceCleanupRendezvousChannel() = runMtTest {
val channel = Channel<Int>(Channel.RENDEZVOUS)
expect(1)
repeat(iterations) { i ->
Expand All @@ -55,7 +55,7 @@ class SelectChannelStressTest: TestBase() {
}

@Test
fun testSelectReceiveResourceRendezvousChannel() = runTest {
fun testSelectReceiveResourceRendezvousChannel() = runMtTest {
val channel = Channel<Int>(Channel.RENDEZVOUS)
expect(1)
repeat(iterations) { i ->
Expand Down

0 comments on commit ed876b0

Please sign in to comment.