Skip to content

Commit

Permalink
Support of new K/N memory model
Browse files Browse the repository at this point in the history
* Dispatchers.Default backed by pool of workers on Linux and by global_queue on iOS-like
* Implementation of Dispatchers.Main that uses main queue on iOS and default dispatcher on other platforms (#2858)
* Introduced newSingleThreadDispatcher and newFixedThreadPoolDispatcher
* Use proper reentrant locking and CoW arrays on new memory model, make TestBase _almost_ race-free
* More thread-safety in Native counterpart and one more test from native-mt
* Source-set sharing for tests shared between JVM and K/N
* Wrap Obj-C interop into autorelease pool to avoid memory leaks
  • Loading branch information
mvicsokolova authored and qwwdfsad committed Oct 26, 2021
1 parent d3ead6f commit 407e0b6
Show file tree
Hide file tree
Showing 66 changed files with 1,156 additions and 551 deletions.
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ allprojects {
*/
google()
mavenCentral()
maven { url "https://maven.pkg.jetbrains.space/kotlin/p/kotlin/dev" }
}
}

Expand Down Expand Up @@ -169,6 +170,10 @@ configure(subprojects.findAll { !sourceless.contains(it.name) }) {
// Remove null assertions to get smaller bytecode on Android
kotlinOptions.freeCompilerArgs += ["-Xno-param-assertions", "-Xno-receiver-assertions", "-Xno-call-assertions"]
}

tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinNativeCompile) {
kotlinOptions.freeCompilerArgs += ["-memory-model", "experimental"]
}
}

if (build_snapshot_train) {
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ org.gradle.jvmargs=-Xmx4g

kotlin.mpp.enableCompatibilityMetadataVariant=true
kotlin.mpp.stability.nowarn=true
kotlin.native.cacheKind=none
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ kotlin {
binaries {
// Test for memory leaks using a special entry point that does not exit but returns from main
binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
binaries.getTest("DEBUG").optimized = true
// Configure a separate test where code runs in background
test("background", [org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.DEBUG]) {
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
optimized = true
}
}
testRuns {
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public abstract class CoroutineDispatcher :
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*/
@ExperimentalCoroutinesApi
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

/**
Expand Down
12 changes: 11 additions & 1 deletion kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// then process one event from queue
val task = dequeue()
if (task != null) {
task.run()
platformAutoreleasePool { task.run() }
return 0
}
return nextTime
Expand Down Expand Up @@ -526,3 +526,13 @@ internal expect object DefaultExecutor {
public fun enqueue(task: Runnable)
}

/**
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
* non-Darwin native targets.
*
* Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
* the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
* be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
* pool management, it must manage the pool creation and pool drainage manually.
*/
internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
8 changes: 2 additions & 6 deletions kotlinx-coroutines-core/common/test/EmptyContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ package kotlinx.coroutines
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*

suspend fun <T> withEmptyContext(block: suspend () -> T): T {
val baseline = Result.failure<T>(IllegalStateException("Block was suspended"))
var result: Result<T> = baseline
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { result = it })
while (result == baseline) yield()
return result.getOrThrow()
suspend fun <T> withEmptyContext(block: suspend () -> T): T = suspendCoroutine { cont ->
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { cont.resumeWith(it) })
}
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlin.test.*

public expect val isStressTest: Boolean
public expect val stressTestMultiplier: Int
public expect val stressTestMultiplierSqrt: Int

/**
* The result of a multiplatform asynchronous test.
Expand All @@ -20,6 +21,10 @@ public expect val stressTestMultiplier: Int
@Suppress("NO_ACTUAL_FOR_EXPECT")
public expect class TestResult

public expect val isNative: Boolean
// "Speedup" native stress tests
public val stressTestNativeDivisor = if (isNative) 10 else 1

public expect open class TestBase constructor() {
/*
* In common tests we emulate parameterized tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
}
expectUnreached()
} catch (e: IllegalStateException) {
assertTrue(e.message!!.contains("Flow invariant is violated"))
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
finish(2)
}
}
Expand Down
14 changes: 14 additions & 0 deletions kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*

/**
* Runs a new coroutine and **blocks** the current thread until its completion.
* This function should not be used from a coroutine. It is designed to bridge regular blocking code
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
*/
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

@ExperimentalCoroutinesApi
public expect fun newSingleThreadContext(name: String): MultithreadedDispatcher

@ExperimentalCoroutinesApi
public expect fun newFixedThreadPoolContext(nThreads: Int, name: String): MultithreadedDispatcher

/**
* A coroutine dispatcher that is confined to a single thread.
*/
@ExperimentalCoroutinesApi
public expect abstract class MultithreadedDispatcher : CoroutineDispatcher {

/**
* Closes this coroutine dispatcher and shuts down its thread.
*/
public abstract fun close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kotlinx.coroutines.internal

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.jvm.*

private typealias Node = LockFreeLinkedListNode

Expand Down Expand Up @@ -616,7 +617,7 @@ public actual open class LockFreeLinkedListNode {
assert { next === this._next.value }
}

override fun toString(): String = "${this::class.java.simpleName}@${Integer.toHexString(System.identityHashCode(this))}"
override fun toString(): String = "${this::classSimpleName}@${this.hexAddress}"
}

private class Removed(@JvmField val ref: Node) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlin.test.*

/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

abstract class AbstractDispatcherConcurrencyTest : TestBase() {

public abstract val dispatcher: CoroutineDispatcher

@Test
fun testLaunchAndJoin() {
expect(1)
var capturedMutableState = 0
val job = GlobalScope.launch(dispatcher) {
++capturedMutableState
expect(2)
}
runBlocking { job.join() }
assertEquals(1, capturedMutableState)
finish(3)
}

@Test
fun testDispatcherIsActuallyMultithreaded() {
val channel = Channel<Int>()
GlobalScope.launch(dispatcher) {
channel.send(42)
}

var result = ChannelResult.failure<Int>()
while (!result.isSuccess) {
result = channel.tryReceive()
// Block the thread, wait
}
// Delivery was successful, let's check it
assertEquals(42, result.getOrThrow())
}

@Test
fun testDelayInDefaultDispatcher() {
expect(1)
val job = GlobalScope.launch(dispatcher) {
expect(2)
delay(100)
expect(3)
}
runBlocking { job.join() }
finish(4)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -142,4 +142,4 @@ class AtomicCancellationTest : TestBase() {
yield() // to jobToJoin & canceller
expect(6)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.exceptions.*
import kotlin.test.*

class ConcurrentExceptionsStressTest : TestBase() {
private val nWorkers = 4
private val nRepeat = 1000 * stressTestMultiplier

private val workers = Array(nWorkers) { index ->
newSingleThreadContext("JobExceptionsStressTest-$index")
}

@AfterTest
fun tearDown() {
workers.forEach {
it.close()
}
}

@Test
fun testStress() = runTest {
repeat(nRepeat) {
testOnce()
}
}

@Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
private suspend fun CoroutineScope.testOnce() {
val deferred = async(NonCancellable) {
repeat(nWorkers) { index ->
// Always launch a coroutine even if parent job was already cancelled (atomic start)
launch(workers[index], start = CoroutineStart.ATOMIC) {
randomWait()
throw StressException(index)
}
}
}
deferred.join()
assertTrue(deferred.isCancelled)
val completionException = deferred.getCompletionExceptionOrNull()
val cause = completionException as? StressException
?: unexpectedException("completion", completionException)
val suppressed = cause.suppressed
val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
(e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
}
repeat(nWorkers) { index ->
assertTrue(index in indices, "Exception $index is missing: $indices")
}
assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
}

private fun unexpectedException(msg: String, e: Throwable?): Nothing {
e?.printStackTrace()
throw IllegalStateException("Unexpected $msg exception", e)
}

private class StressException(val index: Int) : SuppressSupportingThrowable()
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.exceptions

import kotlinx.coroutines.*

internal expect open class SuppressSupportingThrowable() : Throwable
expect val Throwable.suppressed: Array<Throwable>
expect fun Throwable.printStackTrace()

expect fun randomWait()

expect fun currentThreadName(): String

inline fun MultithreadedDispatcher.use(block: (MultithreadedDispatcher) -> Unit) {
try {
block(this)
} finally {
close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines

class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = Dispatchers.Default
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.*
import kotlinx.coroutines.*
import kotlin.test.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -61,4 +60,4 @@ class JobStructuredJoinStressTest : TestBase() {
}
finish(2 + nRepeats)
}
}
}

0 comments on commit 407e0b6

Please sign in to comment.