Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate new Native memory model into kotlinx-coroutines mainline #2833

Merged
merged 26 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8cdafed
Support of new K/N memory model
qwwdfsad Jul 12, 2021
206d076
Merge branch 'develop' into new-native-memory-model
qwwdfsad Oct 26, 2021
96f7dc1
~compilation fixes
qwwdfsad Oct 26, 2021
e052ea1
~move K/N compiler configuration to core
qwwdfsad Oct 26, 2021
b343d93
~migrate to CloseableCoroutineDispatcher
qwwdfsad Oct 26, 2021
5442a1c
~run tests 4 times
qwwdfsad Oct 27, 2021
2c8439f
~run coroutines in two modes
qwwdfsad Oct 27, 2021
e8a76cf
~cleanup:
qwwdfsad Oct 27, 2021
57d65da
~fixup tests:
qwwdfsad Oct 27, 2021
6ee3209
Merge branch 'develop' into new-native-memory-model
qwwdfsad Nov 10, 2021
575b468
Update kotlinx-coroutines-core/native/test/ConcurrentTestUtilities.kt
qwwdfsad Nov 10, 2021
d87858d
Update kotlinx-coroutines-core/native/src/internal/CopyOnWriteList.kt
qwwdfsad Nov 10, 2021
93f8867
Update kotlinx-coroutines-core/native/src/CoroutineContext.kt
qwwdfsad Nov 10, 2021
79ba184
Update kotlinx-coroutines-core/concurrent/test/AbstractDispatcherConc…
qwwdfsad Nov 10, 2021
63877dd
Update kotlinx-coroutines-core/concurrent/test/AbstractDispatcherConc…
qwwdfsad Nov 10, 2021
51aa77f
Update kotlinx-coroutines-core/native/src/EventLoop.kt
qwwdfsad Nov 10, 2021
c77e852
~batch of small fixes
qwwdfsad Nov 10, 2021
a6f2d5c
~remove leftover
qwwdfsad Nov 10, 2021
f8e6661
~cleanup
qwwdfsad Nov 10, 2021
2da3434
Update kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt
qwwdfsad Nov 11, 2021
ef2f6fc
Update kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt
qwwdfsad Nov 11, 2021
3412db3
Update kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt
qwwdfsad Nov 11, 2021
6ac83ee
~set of small cleanups
qwwdfsad Nov 11, 2021
f005b54
~refactor multithreadingSupported to be native-specific
qwwdfsad Nov 11, 2021
5564b35
~formatting
qwwdfsad Nov 11, 2021
d7e8d79
~warnings about SubscriberList
qwwdfsad Nov 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/SourceSets.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ fun KotlinSourceSet.configureMultiplatform() {
optInAnnotations.forEach { optIn(it) }
progressiveMode = true
}
}
}
56 changes: 44 additions & 12 deletions kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,50 @@ kotlin {
SourceSetsKt.configureMultiplatform(it)
}

configure(targets) {
// Configure additional binaries and test runs -- one for each OS
if (["macos", "linux", "mingw"].any { name.startsWith(it) }) {
binaries {
// Test for memory leaks using a special entry point that does not exit but returns from main
binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
// Configure a separate test where code runs in background
test("background", [org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.DEBUG]) {
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
}
/*
* Configure four test runs:
* 1) Old memory model, Main thread
* 2) New memory model, Main thread
* 3) Old memory model, BG thread
* 4) New memory model, BG thread (required for Dispatchers.Main tests on Darwin)
*
* All new MM targets are build with optimize = true to have stress tests properly run.
*/
targets.withType(org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTargetWithTests.class).configureEach {
binaries {
// Test for memory leaks using a special entry point that does not exit but returns from main
binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
}

binaries.test("newMM", [DEBUG]) {
def thisTest = it
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
optimized = true
binaryOptions["memoryModel"] = "experimental"
testRuns.create("newMM") {
setExecutionSourceFrom(thisTest)
// A hack to get different suffixes in the aggregated report.
executionTask.configure { targetName = "$targetName new MM" }
}
}

binaries.test("worker", [DEBUG]) {
def thisTest = it
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
testRuns.create("worker") {
setExecutionSourceFrom(thisTest)
executionTask.configure { targetName = "$targetName worker" }
}
testRuns {
background { setExecutionSourceFrom(binaries.backgroundDebugTest) }
}

binaries.test("workerWithNewMM", [DEBUG]) {
def thisTest = it
optimized = true
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
binaryOptions["memoryModel"] = "experimental"
testRuns.create("workerWithNewMM") {
setExecutionSourceFrom(thisTest)
executionTask.configure { targetName = "$targetName worker with new MM" }
}
}
}
Expand All @@ -97,6 +128,7 @@ kotlin {
}
}


configurations {
configureKotlinJvmPlatform(kotlinCompilerPluginClasspath)
}
Expand Down
19 changes: 17 additions & 2 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// then process one event from queue
val task = dequeue()
if (task != null) {
task.run()
platformAutoreleasePool { task.run() }
return 0
}
return nextTime
Expand All @@ -289,7 +289,11 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
if (multithreadingSupported) {
dkhalanskyjb marked this conversation as resolved.
Show resolved Hide resolved
DefaultExecutor.enqueue(task)
} else {
error("Cannot execute task because event loop was shut down")
}
}
}

Expand Down Expand Up @@ -530,3 +534,14 @@ internal expect fun nanoTime(): Long
internal expect object DefaultExecutor {
public fun enqueue(task: Runnable)
}

/**
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
* non-Darwin native targets.
*
* Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
* the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
* be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
* pool management, it must manage the pool creation and pool drainage manually.
*/
internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ internal expect class ReentrantLock() {
internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T

internal expect fun <E> identitySet(expectedSize: Int): MutableSet<E>

internal expect val multithreadingSupported: Boolean
8 changes: 2 additions & 6 deletions kotlinx-coroutines-core/common/test/EmptyContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ package kotlinx.coroutines
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*

suspend fun <T> withEmptyContext(block: suspend () -> T): T {
val baseline = Result.failure<T>(IllegalStateException("Block was suspended"))
var result: Result<T> = baseline
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { result = it })
while (result == baseline) yield()
return result.getOrThrow()
suspend fun <T> withEmptyContext(block: suspend () -> T): T = suspendCoroutine { cont ->
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { cont.resumeWith(it) })
}
19 changes: 19 additions & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.test.*

public expect val isStressTest: Boolean
public expect val stressTestMultiplier: Int
public expect val stressTestMultiplierSqrt: Int

/**
* The result of a multiplatform asynchronous test.
Expand All @@ -20,6 +22,10 @@ public expect val stressTestMultiplier: Int
@Suppress("NO_ACTUAL_FOR_EXPECT")
public expect class TestResult

public expect val isNative: Boolean
// "Speedup" native stress tests
public val stressTestNativeDivisor = if (isNative) 10 else 1
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved

public expect open class TestBase constructor() {
/*
* In common tests we emulate parameterized tests
Expand All @@ -43,6 +49,19 @@ public expect open class TestBase constructor() {
): TestResult
}

public fun TestBase.runMtTest(
expected: ((Throwable) -> Boolean)? = null,
unhandled: List<(Throwable) -> Boolean> = emptyList(),
block: suspend CoroutineScope.() -> Unit
): TestResult {
// Never invoked on JS, not extracted to separate 'concurrent' source set
// as it doesn't help with the suppress anyway and requires an additional ceremony
// and spreads TestBase extensions over the codebase
@Suppress("CAST_NEVER_SUCCEEDS")
if (!multithreadingSupported) return Unit as TestResult
dkhalanskyjb marked this conversation as resolved.
Show resolved Hide resolved
return runTest(expected, unhandled, block)
}

public suspend inline fun hang(onCancellation: () -> Unit) {
try {
suspendCancellableCoroutine<Unit> { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
}
expectUnreached()
} catch (e: IllegalStateException) {
assertTrue(e.message!!.contains("Flow invariant is violated"))
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
finish(2)
}
}
Expand Down
14 changes: 14 additions & 0 deletions kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*

/**
* Runs a new coroutine and **blocks** the current thread until its completion.
* This function should not be used from a coroutine. It is designed to bridge regular blocking code
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
*/
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

@ExperimentalCoroutinesApi
public expect fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher

@ExperimentalCoroutinesApi
public expect fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package kotlinx.coroutines.internal

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.jvm.*
import kotlin.native.concurrent.*

private typealias Node = LockFreeLinkedListNode

Expand All @@ -20,9 +22,11 @@ internal const val SUCCESS: Int = 1
internal const val FAILURE: Int = 2

@PublishedApi
@SharedImmutable
internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE")

@PublishedApi
@SharedImmutable
internal val LIST_EMPTY: Any = Symbol("LIST_EMPTY")

/** @suppress **This is unstable API and it is subject to change.** */
Expand Down Expand Up @@ -616,7 +620,7 @@ public actual open class LockFreeLinkedListNode {
assert { next === this._next.value }
}

override fun toString(): String = "${this::class.java.simpleName}@${Integer.toHexString(System.identityHashCode(this))}"
override fun toString(): String = "${this::classSimpleName}@${this.hexAddress}"
}

private class Removed(@JvmField val ref: Node) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlin.test.*


abstract class AbstractDispatcherConcurrencyTest : TestBase() {

public abstract val dispatcher: CoroutineDispatcher

@Test
fun testLaunchAndJoin() = runMtTest {
expect(1)
var capturedMutableState = 0
val job = GlobalScope.launch(dispatcher) {
++capturedMutableState
expect(2)
}
runBlocking { job.join() }
assertEquals(1, capturedMutableState)
finish(3)
}

@Test
fun testDispatcherHasOwnThreads() = runMtTest {
val channel = Channel<Int>()
GlobalScope.launch(dispatcher) {
channel.send(42)
}

var result = ChannelResult.failure<Int>()
while (!result.isSuccess) {
result = channel.tryReceive()
// Block the thread, wait
}
// Delivery was successful, let's check it
assertEquals(42, result.getOrThrow())
}

@Test
fun testDelayInDispatcher() = runMtTest {
expect(1)
val job = GlobalScope.launch(dispatcher) {
expect(2)
delay(100)
expect(3)
}
runBlocking { job.join() }
finish(4)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -142,4 +142,4 @@ class AtomicCancellationTest : TestBase() {
yield() // to jobToJoin & canceller
expect(6)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.exceptions.*
import kotlinx.coroutines.internal.*
import kotlin.test.*

class ConcurrentExceptionsStressTest : TestBase() {
private val nWorkers = 4
private val nRepeat = 1000 * stressTestMultiplier

private val workers = Array(if (multithreadingSupported) nWorkers else 0) { index ->
newSingleThreadContext("JobExceptionsStressTest-$index")
}

@AfterTest
fun tearDown() {
workers.forEach {
it.close()
}
}

@Test
fun testStress() = runMtTest {
repeat(nRepeat) {
testOnce()
}
}

@Suppress("SuspendFunctionOnCoroutineScope") // workaround native inline fun stacktraces
private suspend fun CoroutineScope.testOnce() {
val deferred = async(NonCancellable) {
repeat(nWorkers) { index ->
// Always launch a coroutine even if parent job was already cancelled (atomic start)
launch(workers[index], start = CoroutineStart.ATOMIC) {
randomWait()
throw StressException(index)
}
}
}
deferred.join()
assertTrue(deferred.isCancelled)
val completionException = deferred.getCompletionExceptionOrNull()
val cause = completionException as? StressException
?: unexpectedException("completion", completionException)
val suppressed = cause.suppressed
val indices = listOf(cause.index) + suppressed.mapIndexed { index, e ->
(e as? StressException)?.index ?: unexpectedException("suppressed $index", e)
}
repeat(nWorkers) { index ->
assertTrue(index in indices, "Exception $index is missing: $indices")
}
assertEquals(nWorkers, indices.size, "Duplicated exceptions in list: $indices")
}

private fun unexpectedException(msg: String, e: Throwable?): Nothing {
throw IllegalStateException("Unexpected $msg exception", e)
}

private class StressException(val index: Int) : SuppressSupportingThrowable()
}