Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make runBlocking release worker permits before park and reacquire after unpark to avoid starvation #4084

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a73ddcf
IntelliJ patches base
vsalavatov Apr 11, 2024
3dab325
IJI-1751 Parametrise Maven publication Space URL
shchuko May 8, 2024
b49f2aa
IJI-1751 Allow setting `libs.space.pub` as Gradle property
shchuko May 8, 2024
efabd25
Switch Worker into a Blocking mode if it tries to run runBlocking wit…
vsalavatov Mar 26, 2024
94571f6
Introduce withUnlimitedIOScheduler utility method to use instead of d…
vsalavatov Mar 27, 2024
f54d9d0
Cleanup: rename test
vsalavatov Mar 27, 2024
275a0ad
Implement permit release also for LimitedDispatcher
vsalavatov Mar 27, 2024
bdc9930
PermitTransfer: remove PERMIT_ACQUIRE_PARK_NS and make park() indefin…
vsalavatov Mar 28, 2024
44b5963
Clean up: CoroutineScheduler.Worker.getCurrentTaskImpl -> getCurrentTask
vsalavatov Mar 28, 2024
ce9036d
Add doc for BlockingDispatchAware
vsalavatov Mar 28, 2024
4356632
Add a test for Default dispatcher liveness and fix a related bug in W…
vsalavatov Mar 28, 2024
bd29ff6
Improve Default dispatcher liveness stress test with corePoolSize=1
vsalavatov Mar 31, 2024
183d7f8
Worker should give away local tasks upon requested CPU permit release…
vsalavatov Mar 31, 2024
811cbd1
Fix a race in CoroutineScheduler tryPark
vsalavatov Mar 31, 2024
4c44035
clean up
vsalavatov Mar 31, 2024
c7a0fc3
Extract and rename test
vsalavatov Apr 2, 2024
de06466
Make LimitedDispatcher's Worker respect BlockingDispatchAware on its …
vsalavatov Apr 2, 2024
adbb5b4
Extract a base runBlocking liveness test
vsalavatov Apr 2, 2024
3561c13
Add tests for runBlocking with LimitedDispatcher
vsalavatov Apr 2, 2024
c702d2a
Move tests around
vsalavatov Apr 2, 2024
8568346
add signalCpuWork()
vsalavatov May 10, 2024
0e1a97d
add description to IntelliJ-patches.md
vsalavatov May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions IntelliJ-patches.md
@@ -0,0 +1,20 @@
# Included IntelliJ-related patches

## `runBlocking` without Dispatcher starvation

[IJPL-721](https://youtrack.jetbrains.com/issue/IJPL-721), [#3983](https://github.com/Kotlin/kotlinx.coroutines/issues/3983)

### Description:
`runBlocking` with its default semantics may cause dispatcher starvation if it is called on a worker thread.
For example, if `runBlocking` happens to block all `Dispatchers.Default` workers, it may lead to a deadlock in the application:
there may be tasks in the CPU queue that `runBlocking`s await, but there are none CPU workers available to run them.

This patch changes the behavior of `runBlocking` so that it always releases associated computation permits before it parks,
and reacquires them after unpark. It works for every `CoroutineDispatcher` that is built using library primitives:
plain `Dispatcher.*` objects or `.limitedParallelism` limited dispatchers that are on top of them.

This change in behavior comes with a cost. Permit reacquiring mechanism _may_ need an additional thread park/unpark.
Worker threads that release their computational permits always let go of the local task queue, which means less benefit
from locality, higher contention and transactional costs at the very least.

This patch doesn't change the fact that `runBlocking` should still be used carefully.
12 changes: 11 additions & 1 deletion README.md
@@ -1,4 +1,14 @@
# kotlinx.coroutines
# kotlinx.coroutines with IntelliJ patches

This repository is a fork of the original [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) library that includes
several patches (see [IntelliJ-patches.md](IntelliJ-patches.md)).

**Important:**
- Only JVM build target is fully supported and used

Release instructions are [here](RELEASE.md).

---

[![Kotlin Stable](https://kotl.in/badges/stable.svg)](https://kotlinlang.org/docs/components-stability.html)
[![JetBrains official project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
Expand Down
22 changes: 22 additions & 0 deletions RELEASE.md
@@ -1,3 +1,25 @@
# com.intellij.platform:kotlinx-coroutines-* release

```
# update branches
# git remote add upstream https://github.com/Kotlin/kotlinx.coroutines.git
git checkout master
git fetch upstream
git fetch origin

# prepare new master with patches
git reset --hard upstream/master
git rebase intellij/patch-base
git rebase intellij/whatever-patches-we-have, see IntelliJ-patches.md for the list of branches

# Remember to change the version in `gradle.properties` to something like `1.8.4-intellij-SNAPSHOT`
# commit version change

git push origin master --force
```

---

# kotlinx.coroutines release checklist

To release a new `<version>` of `kotlinx-coroutines`:
Expand Down
12 changes: 7 additions & 5 deletions buildSrc/src/main/kotlin/Publishing.kt
Expand Up @@ -48,11 +48,13 @@ fun MavenPom.configureMavenCentralMetadata(project: Project) {
* dev build into 'https://maven.pkg.jetbrains.space/public/p/kotlinx-coroutines/maven' Maven repository.
* In order to use it, pass the corresponding ENV to the TC 'Deploy' task.
*/
private val spacePublicationEnabled = System.getenv("libs.space.pub")?.equals("true") ?: false
private val Project.spacePublicationEnabled: Boolean
get() = getSensitiveProperty("libs.space.pub")?.equals("true") ?: false

fun mavenRepositoryUri(): URI {
fun Project.mavenRepositoryUri(): URI {
if (spacePublicationEnabled) {
return URI("https://maven.pkg.jetbrains.space/public/p/kotlinx-coroutines/maven")
val spaceRepoUrl = getSensitiveProperty("libs.space.url")
return URI(spaceRepoUrl ?: "https://maven.pkg.jetbrains.space/public/p/kotlinx-coroutines/maven")
}

val repositoryId: String? = System.getenv("libs.repository.id")
Expand All @@ -65,9 +67,9 @@ fun mavenRepositoryUri(): URI {

fun configureMavenPublication(rh: RepositoryHandler, project: Project) {
rh.maven {
url = mavenRepositoryUri()
url = project.mavenRepositoryUri()
credentials {
if (spacePublicationEnabled) {
if (project.spacePublicationEnabled) {
// Configure space credentials
username = project.getSensitiveProperty("libs.space.user")
password = project.getSensitiveProperty("libs.space.password")
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
@@ -1,6 +1,6 @@
# Kotlin
version=1.8.0-SNAPSHOT
group=org.jetbrains.kotlinx
group=com.intellij.platform
kotlin_version=1.9.21
# DO NOT rename this property without adapting kotlinx.train build chain:
atomicfu_version=0.23.1
Expand Down
@@ -0,0 +1,22 @@
package kotlinx.coroutines.internal

/**
* [Runnables][kotlinx.coroutines.Runnable] that are dispatched on [CoroutineDispatcher][kotlinx.coroutines.CoroutineDispatcher]
* may optionally implement this interface to be notified of dispatch emulation in blocking mode.
*
* This may be needed for controlling dispatcher to release/acquire a permit of the worker that currently
* executes the dispatched Runnable.
* @see LimitedDispatcher.Worker
* @see kotlinx.coroutines.scheduling.withUnlimitedIOScheduler
*/
internal interface BlockingDispatchAware {
/**
* Must not throw
*/
fun beforeDispatchElsewhere()

/**
* Must not throw
*/
fun afterDispatchBack()
}
28 changes: 27 additions & 1 deletion kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
Expand Up @@ -103,12 +103,14 @@ internal class LimitedDispatcher(
* actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
* perform any more dispatches.
*/
private inner class Worker(private var currentTask: Runnable) : Runnable {
private inner class Worker(private var currentTask: Runnable) : Runnable, BlockingDispatchAware {
override fun run() {
var fairnessCounter = 0
while (true) {
try {
currentTask.run()
} catch (e: WorkerPermitTransferCompleted) {
if (!tryAllocateWorker()) return
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
Expand All @@ -122,7 +124,31 @@ internal class LimitedDispatcher(
}
}
}

override fun beforeDispatchElsewhere() {
// compensate while we are blocked and consider that we gave away our permit to a new worker
val newWorker = Worker(Runnable {})
dispatcher.dispatch(this@LimitedDispatcher, newWorker)
(currentTask as? BlockingDispatchAware)?.beforeDispatchElsewhere()
}

override fun afterDispatchBack() {
(currentTask as? BlockingDispatchAware)?.afterDispatchBack()
if (tryAllocateWorker()) return
val permitTransfer = PermitTransfer()
queue.addLast(
permitTransfer.releaseFun { throw WorkerPermitTransferCompleted }
.let { Runnable { it() } }
)
permitTransfer.acquire(
tryAllocatePermit = ::tryAllocateWorker,
deallocatePermit = { runningWorkers.decrementAndGet() }
)
}

}

private object WorkerPermitTransferCompleted : Throwable()
}

// Save a few bytecode ops
Expand Down
45 changes: 45 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt
@@ -0,0 +1,45 @@
package kotlinx.coroutines.internal

import kotlinx.atomicfu.*

internal class PermitTransferStatus {
private val status = atomic(false)
fun check(): Boolean = status.value
fun complete(): Boolean = status.compareAndSet(false, true)
}

internal expect class PermitTransfer constructor() {
/**
* [releasePermit] may throw
*/
fun releaseFun(releasePermit: () -> Unit): () -> Unit

/**
* [tryAllocatePermit] and [deallocatePermit] must not throw
*/
fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit)
}

internal class BusyPermitTransfer {
private val status = PermitTransferStatus()

fun releaseFun(releasePermit: () -> Unit): () -> Unit = {
if (status.complete()) {
releasePermit()
}
}

fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit) {
while (true) {
if (status.check()) {
return
}
if (tryAllocatePermit()) {
if (!status.complete()) { // race: transfer was completed first by releaseFun
deallocatePermit()
}
return
}
}
}
}
@@ -0,0 +1,3 @@
package kotlinx.coroutines.internal

internal actual typealias PermitTransfer = BusyPermitTransfer // TODO
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/jvm/src/Builders.kt
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.scheduling.*
import java.util.concurrent.locks.*
import kotlin.contracts.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -95,7 +96,11 @@ private class BlockingCoroutine<T>(
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
if (parkNanos > 0) {
withUnlimitedIOScheduler {
parkNanos(this, parkNanos)
}
}
}
} finally { // paranoia
eventLoop?.decrementUseCount()
Expand Down
35 changes: 35 additions & 0 deletions kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt
@@ -0,0 +1,35 @@
package kotlinx.coroutines.internal

import java.util.concurrent.locks.*

internal actual class PermitTransfer {
private val status = PermitTransferStatus()

public actual fun releaseFun(releasePermit: () -> Unit): () -> Unit {
val blockedThread = Thread.currentThread()
return {
if (status.complete()) {
try {
releasePermit()
} finally {
LockSupport.unpark(blockedThread)
}
}
}
}

public actual fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit) {
while (true) {
if (status.check()) {
return
}
if (tryAllocatePermit()) {
if (!status.complete()) { // race: transfer was completed first by releaseFun
deallocatePermit()
}
return
}
LockSupport.park(this)
}
}
}