Skip to content

Commit

Permalink
Merge pull request #2854 from Kotlin/concurrent-test
Browse files Browse the repository at this point in the history
Sharing concurrent tests for jvm and native
  • Loading branch information
mvicsokolova committed Aug 5, 2021
2 parents 68e8943 + d43f43a commit 7909ee6
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 114 deletions.
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import kotlin.test.*

public expect val isStressTest: Boolean
public expect val stressTestMultiplier: Int
public expect val stressTestMultiplierSqrt: Int

public expect val isNative: Boolean

public expect open class TestBase constructor() {
public fun error(message: Any, cause: Throwable? = null): Nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.test.*

/**
* Creates a broadcast channel and repeatedly opens new subscription, receives event, closes it,
* to stress test the logic of opening the subscription
* to broadcast channel while events are being concurrently sent to it.
*/
class BroadcastChannelSubStressTest: TestBase() {

private val nSeconds = 5 * stressTestMultiplier
private val sentTotal = atomic(0L)
private val receivedTotal = atomic(0L)

@Test
fun testStress() = runTest {
TestBroadcastChannelKind.values().forEach { kind ->
println("--- BroadcastChannelSubStressTest $kind")
val broadcast = kind.create<Long>()
val sender =
launch(context = Dispatchers.Default + CoroutineName("Sender")) {
while (isActive) {
broadcast.send(sentTotal.incrementAndGet())
}
}
val receiver =
launch(context = Dispatchers.Default + CoroutineName("Receiver")) {
var last = -1L
while (isActive) {
val channel = broadcast.openSubscription()
val i = channel.receive()
check(i >= last) { "Last was $last, got $i" }
if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
receivedTotal.incrementAndGet()
last = i
channel.cancel()
}
}
var prevSent = -1L
repeat(nSeconds) { sec ->
delay(1000)
val curSent = sentTotal.value
println("${sec + 1}: Sent $curSent, received ${receivedTotal.value}")
check(curSent > prevSent) { "Send stalled at $curSent events" }
prevSent = curSent
}
withTimeout(5000) {
sender.cancelAndJoin()
receiver.cancelAndJoin()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package kotlinx.coroutines.channels

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import java.util.concurrent.atomic.*
import kotlin.random.*
import kotlin.test.*

Expand All @@ -25,7 +25,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
private var dSendExceptionCnt = 0
private var dTrySendFailedCnt = 0
private var dReceivedCnt = 0
private val dUndeliveredCnt = AtomicInteger()
private val dUndeliveredCnt = atomic(0)

@Test
fun testStress() = runTest {
Expand All @@ -43,23 +43,23 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
joinAll(j1, j2)

// All elements must be either received or undelivered (IN every run)
if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.value) {
println(" Send: $dSendCnt")
println("Send exception: $dSendExceptionCnt")
println("trySend failed: $dTrySendFailedCnt")
println(" Received: $dReceivedCnt")
println(" Undelivered: ${dUndeliveredCnt.get()}")
println(" Undelivered: ${dUndeliveredCnt.value}")
error("Failed")
}
trySendFailedCnt += dTrySendFailedCnt
receivedCnt += dReceivedCnt
undeliveredCnt += dUndeliveredCnt.get()
undeliveredCnt += dUndeliveredCnt.value
// clear for next run
dSendCnt = 0
dSendExceptionCnt = 0
dTrySendFailedCnt = 0
dReceivedCnt = 0
dUndeliveredCnt.set(0)
dUndeliveredCnt.value = 0
}
// Stats
println(" Send: $sendCnt")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import org.junit.Test
import java.util.concurrent.atomic.*
import kotlin.test.*

class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
Expand All @@ -17,10 +16,10 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {

private val broadcast = ConflatedBroadcastChannel<Int>()

private val sendersCompleted = AtomicInteger()
private val receiversCompleted = AtomicInteger()
private val sentTotal = AtomicInteger()
private val receivedTotal = AtomicInteger()
private val sendersCompleted = atomic(0)
private val receiversCompleted = atomic(0)
private val sentTotal = atomic(0)
private val receivedTotal = atomic(0)

@Test
fun testStressNotify()= runBlocking {
Expand Down Expand Up @@ -57,7 +56,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
var seconds = 0
while (true) {
delay(1000)
println("${++seconds}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}")
println("${++seconds}: Sent ${sentTotal.value}, received ${receivedTotal.value}")
}
}
try {
Expand All @@ -71,13 +70,13 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
}
progressJob.cancel()
println("Tested with nSenders=$nSenders, nReceivers=$nReceivers")
println("Completed successfully ${sendersCompleted.get()} sender coroutines")
println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
println(" Sent ${sentTotal.get()} events")
println(" Received ${receivedTotal.get()} events")
assertEquals(nSenders, sendersCompleted.get())
assertEquals(nReceivers, receiversCompleted.get())
assertEquals(nEvents, sentTotal.get())
println("Completed successfully ${sendersCompleted.value} sender coroutines")
println("Completed successfully ${receiversCompleted.value} receiver coroutines")
println(" Sent ${sentTotal.value} events")
println(" Received ${receivedTotal.value} events")
assertEquals(nSenders, sendersCompleted.value)
assertEquals(nReceivers, receiversCompleted.value)
assertEquals(nEvents, sentTotal.value)
}

private suspend fun waitForEvent(): Int =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import org.junit.*
import kotlin.test.*

class RandevouzChannelStressTest : TestBase() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import org.junit.*
import kotlin.test.*

class CombineStressTest : TestBase() {

@Test
public fun testCancellation() = runTest {
fun testCancellation() = runTest {
withContext(Dispatchers.Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
flow {
expect(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class FlowCancellationTest : TestBase() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import org.junit.Test
import kotlinx.coroutines.internal.*
import kotlin.test.*

class LockFreeLinkedListTest {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.selects
Expand All @@ -11,61 +11,60 @@ import kotlin.test.*

class SelectChannelStressTest: TestBase() {

// Running less iterations on native platforms because of some performance regression
private val iterations = (if (isNative) 1_000 else 1_000_000) * stressTestMultiplier

@Test
fun testSelectSendResourceCleanupArrayChannel() = runTest {
val channel = Channel<Int>(1)
val n = 10_000_000 * stressTestMultiplier
expect(1)
channel.send(-1) // fill the buffer, so all subsequent sends cannot proceed
repeat(n) { i ->
repeat(iterations) { i ->
select {
channel.onSend(i) { expectUnreached() }
default { expect(i + 2) }
}
}
finish(n + 2)
finish(iterations + 2)
}

@Test
fun testSelectReceiveResourceCleanupArrayChannel() = runTest {
val channel = Channel<Int>(1)
val n = 10_000_000 * stressTestMultiplier
expect(1)
repeat(n) { i ->
repeat(iterations) { i ->
select {
channel.onReceive { expectUnreached() }
default { expect(i + 2) }
}
}
finish(n + 2)
finish(iterations + 2)
}

@Test
fun testSelectSendResourceCleanupRendezvousChannel() = runTest {
val channel = Channel<Int>(Channel.RENDEZVOUS)
val n = 1_000_000 * stressTestMultiplier
expect(1)
repeat(n) { i ->
repeat(iterations) { i ->
select {
channel.onSend(i) { expectUnreached() }
default { expect(i + 2) }
}
}
finish(n + 2)
finish(iterations + 2)
}

@Test
fun testSelectReceiveResourceRendezvousChannel() = runTest {
val channel = Channel<Int>(Channel.RENDEZVOUS)
val n = 1_000_000 * stressTestMultiplier
expect(1)
repeat(n) { i ->
repeat(iterations) { i ->
select {
channel.onReceive { expectUnreached() }
default { expect(i + 2) }
}
}
finish(n + 2)
finish(iterations + 2)
}

internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.selects
Expand Down
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/js/test/TestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import kotlin.js.*

public actual val isStressTest: Boolean = false
public actual val stressTestMultiplier: Int = 1
public actual val stressTestMultiplierSqrt: Int = 1

public actual val isNative = false

public actual open class TestBase actual constructor() {
private var actionIndex = 0
Expand Down
4 changes: 3 additions & 1 deletion kotlinx-coroutines-core/jvm/test/TestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ private val VERBOSE = systemProp("test.verbose", false)
*/
public actual val isStressTest = System.getProperty("stressTest")?.toBoolean() ?: false

public val stressTestMultiplierSqrt = if (isStressTest) 5 else 1
public actual val stressTestMultiplierSqrt = if (isStressTest) 5 else 1

public actual val isNative = false

/**
* Multiply various constants in stress tests by this factor, so that they run longer during nightly stress test.
Expand Down

0 comments on commit 7909ee6

Please sign in to comment.