From 7048a1a6b4890dbe60b6c9ecea0174afc84615b6 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 29 Mar 2022 17:28:05 +0300 Subject: [PATCH] Fix limitedParallelism implementation on K/N The initial implementation predates new memory model and was never working on it Fixes #3223 --- .../common/src/internal/LimitedDispatcher.kt | 9 ++-- .../test/LimitedParallelismSharedTest.kt | 30 +++++++++++++ .../LimitedParallelismSharedStressTest.kt | 44 +++++++++++++++++++ .../jvm/test/LimitedParallelismTest.kt | 8 ---- 4 files changed, 79 insertions(+), 12 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/LimitedParallelismSharedTest.kt create mode 100644 kotlinx-coroutines-core/concurrent/test/LimitedParallelismSharedStressTest.kt diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 892375b89f..28f37ecf1d 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -23,6 +23,9 @@ internal class LimitedDispatcher( private val queue = LockFreeTaskQueue(singleConsumer = false) + // A separate object that we can synchronize on for K/N + private val workerAllocationLock = SynchronizedObject() + @ExperimentalCoroutinesApi override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { parallelism.checkParallelism() @@ -50,8 +53,7 @@ internal class LimitedDispatcher( continue } - @Suppress("CAST_NEVER_SUCCEEDS") - synchronized(this as SynchronizedObject) { + synchronized(workerAllocationLock) { --runningWorkers if (queue.size == 0) return ++runningWorkers @@ -87,8 +89,7 @@ internal class LimitedDispatcher( } private fun tryAllocateWorker(): Boolean { - @Suppress("CAST_NEVER_SUCCEEDS") - synchronized(this as SynchronizedObject) { + synchronized(workerAllocationLock) { if (runningWorkers >= parallelism) return false ++runningWorkers return true diff --git a/kotlinx-coroutines-core/common/test/LimitedParallelismSharedTest.kt b/kotlinx-coroutines-core/common/test/LimitedParallelismSharedTest.kt new file mode 100644 index 0000000000..dbde643ca1 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/LimitedParallelismSharedTest.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlin.test.* + +class LimitedParallelismSharedTest : TestBase() { + @Test + fun testTaskFairness() = runTest { + val view = Dispatchers.Default.limitedParallelism(1) + val view2 = Dispatchers.Default.limitedParallelism(1) + val j1 = launch(view) { + while (true) { + yield() + } + } + val j2 = launch(view2) { j1.cancel() } + joinAll(j1, j2) + } + + @Test + fun testParallelismSpec() { + assertFailsWith { Dispatchers.Default.limitedParallelism(0) } + assertFailsWith { Dispatchers.Default.limitedParallelism(-1) } + assertFailsWith { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) } + Dispatchers.Default.limitedParallelism(Int.MAX_VALUE) + } +} diff --git a/kotlinx-coroutines-core/concurrent/test/LimitedParallelismSharedStressTest.kt b/kotlinx-coroutines-core/concurrent/test/LimitedParallelismSharedStressTest.kt new file mode 100644 index 0000000000..9a35e53db7 --- /dev/null +++ b/kotlinx-coroutines-core/concurrent/test/LimitedParallelismSharedStressTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.exceptions.* +import kotlin.test.* + +class LimitedParallelismSharedStressTest : TestBase() { + + private val targetParallelism = 4 + private val iterations = 100_000 + private val parallelism = atomic(0) + + private fun checkParallelism() { + val value = parallelism.incrementAndGet() + randomWait() + assertTrue { value <= targetParallelism } + parallelism.decrementAndGet() + } + + @Test + fun testLimitedExecutor() = runMtTest { + val executor = newFixedThreadPoolContext(targetParallelism, "test") + val view = executor.limitedParallelism(targetParallelism) + doStress { + repeat(iterations) { + launch(view) { + checkParallelism() + } + } + } + executor.close() + } + + private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) { + repeat(stressTestMultiplier) { + coroutineScope { + block() + } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt index 30c54117a9..1be91bbc89 100644 --- a/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt +++ b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt @@ -11,14 +11,6 @@ import kotlin.test.* class LimitedParallelismTest : TestBase() { - @Test - fun testParallelismSpec() { - assertFailsWith { Dispatchers.Default.limitedParallelism(0) } - assertFailsWith { Dispatchers.Default.limitedParallelism(-1) } - assertFailsWith { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) } - Dispatchers.Default.limitedParallelism(Int.MAX_VALUE) - } - @Test fun testTaskFairness() = runTest { val executor = newSingleThreadContext("test")