Skip to content

Commit

Permalink
Switch Worker into a Blocking mode if it tries to run runBlocking wit…
Browse files Browse the repository at this point in the history
…h a CPU permit

And reacquire CPU permit after runBlocking finishes. This should resolve Dispatchers.Default starvation in cases where runBlocking is used to run suspend functions from non-suspend execution context.

Kotlin#3983 / IJPL-721
  • Loading branch information
vsalavatov committed Mar 26, 2024
1 parent 8c516f5 commit d06effe
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 1 deletion.
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/jvm/src/Builders.kt
Expand Up @@ -4,6 +4,8 @@

package kotlinx.coroutines

import kotlinx.coroutines.scheduling.*
import kotlinx.coroutines.scheduling.CoroutineScheduler
import java.util.concurrent.locks.*
import kotlin.contracts.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -86,6 +88,7 @@ private class BlockingCoroutine<T>(
@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T {
registerTimeLoopThread()
var cpuPermitReleased: Boolean? = null
try {
eventLoop?.incrementUseCount()
try {
Expand All @@ -95,13 +98,20 @@ private class BlockingCoroutine<T>(
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
if (parkNanos > 0 && cpuPermitReleased == null) {
val worker = Thread.currentThread() as? CoroutineScheduler.Worker
cpuPermitReleased = worker?.releaseCpu() ?: false
}
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
if (cpuPermitReleased == true) {
(Thread.currentThread() as CoroutineScheduler.Worker).reacquireCpu()
}
}
// now return result
val state = this.state.unboxState()
Expand Down
49 changes: 49 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Expand Up @@ -690,6 +690,55 @@ internal class CoroutineScheduler(
return hadCpu
}

/** only for [runBlocking] */
fun releaseCpu(): Boolean {
assert { state == WorkerState.CPU_ACQUIRED || state == WorkerState.BLOCKING }
return tryReleaseCpu(WorkerState.BLOCKING).also {
if (it) {
incrementBlockingTasks()
}
}
}

/** only for [runBlocking] */
fun reacquireCpu() {
assert { state == WorkerState.BLOCKING }
decrementBlockingTasks()
if (tryAcquireCpuPermit()) return
class CpuPermitTransfer {
private val status = atomic(false)
fun check(): Boolean = status.value
fun complete(): Boolean = status.compareAndSet(false, true)
}
val permitTransfer = CpuPermitTransfer()
val blockedWorker = this@Worker
scheduler.dispatch(Runnable {
// this code runs in a different worker thread that holds a CPU token
val cpuHolder = currentThread() as Worker
assert { cpuHolder.state == WorkerState.CPU_ACQUIRED }
if (permitTransfer.complete()) {
cpuHolder.state = WorkerState.BLOCKING
LockSupport.unpark(blockedWorker)
}
}, taskContext = NonBlockingContext)
state = WorkerState.PARKING
while (true) {
if (permitTransfer.check()) {
state = WorkerState.CPU_ACQUIRED
break
}
if (tryAcquireCpuPermit()) {
if (!permitTransfer.complete()) {
// race: transfer was completed by another thread
releaseCpuPermit()
}
assert { state == WorkerState.CPU_ACQUIRED }
break
}
LockSupport.parkNanos(RUN_BLOCKING_CPU_REACQUIRE_PARK_NS)
}
}

override fun run() = runWorker()

@JvmField
Expand Down
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
Expand Up @@ -19,6 +19,11 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
"kotlinx.coroutines.scheduler.resolution.ns", 100000L
)

@JvmField
internal val RUN_BLOCKING_CPU_REACQUIRE_PARK_NS = systemProp(
"kotlinx.coroutines.scheduler.runBlocking.cpu.reacquire.ns", 250L * 1000 * 1000
)

/**
* The maximum number of threads allocated for CPU-bound tasks at the default set of dispatchers.
*
Expand Down
@@ -1,6 +1,5 @@
package kotlinx.coroutines.scheduling

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.rules.*
Expand Down Expand Up @@ -171,4 +170,20 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
fun testZeroParallelism() {
blockingDispatcher(0)
}

@Test
fun testNoCpuStarvationWithDeepRunBlocking() {
val maxDepth = CORES_COUNT * 3 + 3
fun body(depth: Int) {
if (depth == maxDepth) return
runBlocking(dispatcher) {
launch(dispatcher) {
body(depth + 1)
}
}
}

body(1)
checkPoolThreadsCreated(maxDepth..maxDepth + 1)
}
}

0 comments on commit d06effe

Please sign in to comment.