Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runBlocking should let go of CPU token before parking the thread #3983

Open
dovchinnikov opened this issue Dec 11, 2023 · 16 comments
Open

runBlocking should let go of CPU token before parking the thread #3983

dovchinnikov opened this issue Dec 11, 2023 · 16 comments
Labels

Comments

@dovchinnikov
Copy link
Contributor

dovchinnikov commented Dec 11, 2023

What do we have now?

runBlocking parks the thread holding the CPU-token if it happens on a thread of Dispatchers.Default.

What should be instead?

runBlocking should let go of the CPU-token before parking, and "re-acquire" the token before un-parking (it should be un-parked in state where the token is already held by it).

Why?

The current solution is just "don't use runBlocking" followed by "at least don't use runBlocking inside Dispatchers.Default", which is not a solution. This does not work in real-life scenarios, especially with large mixed codebases like IJ. In IJ we've tried to leverage #3439 but the approach is stillborn because it causes #3982 but on multi-threaded scale, and we even didn't start to tackle the thread locals which leak from outer thread into inner tasks.

In other similar scenarios (FJ's managed block), another thread is spawned to compensate for the blocked one. On JVM this is the most viable approach to this day. It's better to spawn an extra thread, which might do some other work later on or just die after timeout, and to risk OOME, than to have a starvation deadlock.

@dkhalanskyjb
Copy link
Collaborator

This does not work in real-life scenarios, especially with large mixed codebases like IJ.

runBlocking has "blocking" in its name, so the idea is that it should be treated like any other blocking task. Do you also have an issue with the fact that a Dispatchers.Default thread parks when connecting to the Internet or reading a file? If yes, why not use Dispatchers.IO instead of Dispatchers.Default everywhere? If not, what's the difference in your mind between runBlocking (which is literally "block the thread until these tasks are finished") and a file read?

@dovchinnikov
Copy link
Contributor Author

runBlocking has "blocking" in its name, so the idea is that it should be treated like any other blocking task.

Then nested runBlocking should not steal tasks from the outer one.

Do you also have an issue with the fact that a Dispatchers.Default thread parks when connecting to the Internet or reading a file?

No, not really, it's something beyond the control of the library.

@dovchinnikov
Copy link
Contributor Author

Let me describe an issue, which we've faced in IJ.

Consider a platform API interface which is supposed to be implemented in 3rd-party plugins:

interface CoolExtension {
  fun computeStuff(): Any
}

The platform calls computeStuff() on a thread from a global thread pool, unrelated to coroutines.

Now, the client decides to use coroutines:

class MyCoolExtension : CoolExtension {
  override fun computeStuff(): Any {
    return runBlocking {
      42
    }
  }
}

After some time, the platform API evolves, and a suspending entry point is added:

interface CoolExtension {
  suspend fun computeStuffS(): Any = computeStuff()
  @Deprecated(...)
  fun computeStuff(): Any = error("Implement me or computeStuffS")
}

computeStuffS is now the entry point, and it delegates to the old computeStuff to maintain compatibility.

The platform calls computeStuffS in Dispatchers.Default because we want to limit the effective parallelism of the system to the CPUs. The problem: if the client switches to Dispatchers.Default inside runBlocking, we are risking thread starvation. We've already faced this problem several times, and at the moment the fix is to avoid using Dispatchers.Default.

Possible solutions

  1. Don't run computeStuff on Dispatchers.Default:
interface CoolExtension {
  suspend fun computeStuffS(): Any = withContext(oldGlobalThreadPoolExecutor.asCoroutineDispatcher()) {
    computeStuff()
  }
  @Deprecated(...)
  fun computeStuff(): Any = error("Implement me or computeStuffS")
}

In particular, this means we cannot guarantee that there are at most X=CPU threads are active at any given moment, which defeats the purpose of Dispatchers.Default. Also, now we are prone to excessive thread context switches, but this could be fixed by replacing oldGlobalThreadPoolExecutor with Dispatchers.IO.limitedParallelism(Int.MAX_VALUE).

  1. Switch from Dispatchers.Default right before calling runBlocking, which we cannot do because runBlocking is inside 3rd-party code, but runBlocking itself could do the switch. With this approach the runBlocking queue would be processing without token (=outside of CPU limit), and that's why I'm considering that runBlocking should instead let go of the token before parking the thread.

  2. (I'm open for other solutions)

@dkhalanskyjb
Copy link
Collaborator

First, an important thing: making runBlocking steal tasks is not unambiguously good: it can also lead to a regression, even when there are still threads available for running tasks.

val process = launch {
  delay(100)
}
launch(Dispatchers.Default) {
  runBlocking {
    println("Joinining the task...")
    process.join()
  }
  println("The task is finished")
}
launch(Dispatchers.Default) {
  Thread.sleep(10000) // do something for a long time
}

Imagine that a thread of Dispatchers.Default enters runBlocking, notices that join can't finish yet, and executes other tasks. Unfortunately, it steals a task that takes a long time to run. In the meantime, the process was joined a long time ago, but the thread doing runBlocking can't proceed until Thread.sleep completes.

In a single-threaded environment, the problem of liveness is not as pronounced: it's par for the course that a single long-running task prevents progress everywhere else. Here, it's really strange that runBlocking waits for something completely unrelated when other threads could have taken the task.

@dovchinnikov
Copy link
Contributor Author

making runBlocking steal tasks is not unambiguously good

Exactly! This and #3982 are the reasons #3439 (stealing inside runBlocking) does not work. I'm sure there are other reasons.

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Dec 19, 2023

After reading everything in #3982, #3439, and this issue several times, I understand the proposed solutions, but I still have no idea what the problem even is, how many problems there are, or how the solutions map to the actual problems (and not simplified reproducers).

Here's one question that has a chance of helping us. Imagine a thread pool that freely spawned up to n threads by default and was prepared to create new threads, but with an exponentially increasing delay for each additional thread. For example, 8 threads by default, 9 after a quarter of a second of all 8 being busy, 10 after half a second more, 11 after a second more, etc. Would all problems be solved if we provided an API to replace Dispatchers.Default with such a thread pool? If not, why not?

If you think this question misses the mark entirely, could you explain the big picture with a clear breakdown of the design constraints we have to fit into? If not, I doubt I can say anything useful and you'll probably have to proceed without me.

@dovchinnikov
Copy link
Contributor Author

Let me elaborate on this first:
#3983 (comment)

Two things to consider:

  1. runBlocking on a thread from a limited dispatcher may cause a starvation deadlock.
val d = Dispatchers.IO.limitedDispatcher(10)
withContext(d) {
  runBlocking {
    withContext(d) { // starvation here
      ...
    }
  }
}
  1. runBlocking is advised as a bridge to switch from blocking to coroutine world.

The problem:
We cannot run 3rd-party implementation of a blocking CoolExtension.computeStuff on any limited dispatcher (including Dispatchers.Default) because we have no control over the 3rd-party implementation:

  • we don't know if it uses runBlocking.
  • if it does, we don't know if it switches back to Dispatchers.Default deeper in the call chain, e.g. by calling another platform API which does the switch to Dispatchers.Default.

In general, we have to call CoolExtension.computeStuff on an unlimited dispatcher (potentially wasting CPU) because switching from Dispatchers.Default to an unlimited dispatcher releases a CPU token.

@dkhalanskyjb
Copy link
Collaborator

How is this situation different from the user code, say, accessing the Internet? That will also block the thread that you provide it with. If the 3rd-party implementation is some arbitrary code, then either you can require that it does nothing blocking there (for example, by validating the implementation with BlockHound), or you can't expect them to be good citizens, so they will eventually eat the thread that you give them.

@dovchinnikov
Copy link
Contributor Author

Re #3983 (comment)

This approach would solve the starvation problem, because eventually there would be enough threads to handle all the tasks. But with this approach there will be no guarantee that the parallelism is equal to CPU count, i.e. after a bunch of threads are spawned, there will be a window where all of them would be unblocked and all of them will process the global CPU-bound queue together.

@dovchinnikov
Copy link
Contributor Author

dovchinnikov commented Dec 19, 2023

How is this situation different from the user code, say, accessing the Internet?

A blocking operation, which blocks on IO, just blocks.

  • we have no control over it => we choose not to think about it, here we can only hope that the implementation would be a good citizen.
  • it will be eventually unblocked, and the system will proceed.

On the other hand, runBlocking, which switches to the same dispatcher, causes starvation, and there is no way of exiting this state.

@dovchinnikov
Copy link
Contributor Author

If the 3rd-party implementation is some arbitrary code, then either you can require that it does nothing blocking there

Ok, but let's consider this:

  • There is a blocking API with no constraints.
  • I, as 3rd party developer, started to use runBlocking because that's an official advise.
  • Suddenly, a wild suspending API appears, and it delegates to blocking one to maintain compatibility.

It would be okay if a suspending API just appears giving me a choice to migrate top it at a convenient time, but, instead, my current implementation becomes broken because I've used runBlocking.

This issue is about evolution of existing APIs, it's about calling the code, which already uses runBlocking under the hood. I've provided an example of evolution here

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Dec 19, 2023

But with this approach there will be no guarantee that the parallelism is equal to CPU count

That's another big thing I don't understand, yes. Do you actually need that guarantee?

Quoting your initial message:

In other similar scenarios (FJ's managed block), another thread is spawned to compensate for the blocked one. On JVM this is the most viable approach to this day. It's better to spawn an extra thread, which might do some other work later on or just die after timeout, and to risk OOME, than to have a starvation deadlock.

I get a strong impression that you're okay with utilizing extra threads to resolve deadlocks.

it will be eventually unblocked, and the system will proceed.

It can realistically take several seconds or more. If you're okay with spawning extra threads, this would be a good time to do that, no?

On the other hand, runBlocking, which switches to the same dispatcher, causes starvation, and there is no way of exiting this state.

I think I see this point, thank you.

There is a blocking API with no constraints.

And then this blocking API with no constraints starts to be unconditionally run on Dispatchers.Default?

Here's another possible way to perform this migration:

interface CoolExtension {
  fun computeStuff(): Any
  suspend fun computeStuffS(): Any {
    throw SuspendImplNotProvided()
  }
}

internal class SuspendImplNotProvided(): Exception()

suspend fun CoolExtension.doComputeStuff() {
  try {
    withContext(Dispatchers.Default) {
      computeStuffS()
    }
  } catch (e: SuspendImplNotProvided) {
    withContext(Dispatchers.IO) {
      computeStuff() // can potentially block, so using the IO dispatcher
    }
  }
}

@dovchinnikov
Copy link
Contributor Author

But with this approach there will be no guarantee that the parallelism is equal to CPU count

That's another big thing I don't understand, yes. Do you actually need that guarantee?

Well, yes! Isn't that the whole thing Dispatchers.Default is about? I mean without this guarantee it would be enough to have a single limited dispatcher without separating Dispatchers.Default and Dispatchers.IO.

I get a strong impression that you're okay with utilizing extra threads to resolve deadlocks.

Yes, extra threads (and risking OOM) are better than total deadlock.

it will be eventually unblocked, and the system will proceed.

It can realistically take several seconds or more. If you're okay with spawning extra threads, this would be a good time to do that, no?

Correct, it would be a good time. But, again, we don't have control over the implementation of JVM IO.

There is a blocking API with no constraints.

And then this blocking API with no constraints starts to be unconditionally run on Dispatchers.Default?

This is what we'd want: run everything on Dispatchers.Default occasionally switching to IO or EDT (event dispatch thread, or UI thread) when needed. Running everything on Dispatchers.Default yields least amount of fighting over CPUs between threads. But the problem is not even Dispatchers.Default, the starvation can happen with any limited dispatcher!

Here's another possible way to perform this migration:

So, basically, this:

interface CoolExtension {
  fun computeStuff(): Any
  suspend fun computeStuffS(): Any {
    withContext(Dispatchers.IO) {  // can potentially block, so using the IO dispatcher
      computeStuff()
    }
  }
}

suspend fun CoolExtension.doComputeStuff() {
  withContext(Dispatchers.Default) {
      computeStuffS()
  }
}

Listed as Possible Solution 1

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Dec 19, 2023

But, again, we don't have control over the implementation of JVM IO.

Exactly. This is why, even if we change what happens in kotlinx-coroutines, you still won't have control over what happens in computeStuff. It's basically a black box that may or may not behave nicely with your threads. One of the reasons may be the use of runBlocking. The more general solution seems to be to allocate new threads in dire conditions.

Listed as Possible Solution 1

Oh, ok, I misunderstood that point, then. Yes, I like this solution the most: until the author of CoolExtension marked their code as async-ready and eliminated the blocking behavior, wasting a CPU token on various blocking tasks will mean heavy underutilization of CPU, occasionally to the point of starvation.

In particular, this means we cannot guarantee that there are at most X=CPU threads are active at any given moment, which defeats the purpose of Dispatchers.Default.

and

Running everything on Dispatchers.Default yields least amount of fighting over CPUs between threads.

Looks like the crucial point. Do I understand correctly that you are prepared to give up some of the parallelism by allowing some of the threads with CPU tokens to block in exchange for improving the ratio of useful work over the total work? I can imagine wanting to do this if the goal is to reduce energy consumption, for example.

@dovchinnikov
Copy link
Contributor Author

dovchinnikov commented Dec 19, 2023

It's basically a black box that may or may not behave nicely with your threads. One of the reasons may be the use of runBlocking.

With regular IO, where we don't have control over, this indeed is a black box. I'd argue that runBlocking case would be a black box if it didn't cause a starvation, and when considering that it does, we have a point of introspection into the said box, which makes it non-black.

The more general solution seems to be to allocate new threads in dire conditions.

I don't really understand the proposition. How would you detect when to spawn a new thread?

Do I understand correctly that you are prepared to give up some of the parallelism by allowing some of the threads with CPU tokens to block in exchange for improving the ratio of useful work over the total work?

Not really. I argue that some situations can be detected, e.g. runBlocking which is about to park. Once it's parked, it's parked, the thread is not even considered for scheduling purposes by the system scheduler => there is no sense in holding onto the CPU token => coroutine scheduler should give the token to another thread (and spawn one if there no other thread). In this sense I don't really want to give up some parallelism, I want threads to do maximum amount of work, and if some thread blocks, then another thread should continue doing the work, so the total number of active threads under load is equals to number of CPUs.

I believe there is only one user-callable function in the library which blocks: runBlocking, please correct me if I'm wrong. This means that if it would work like I propose, it can be safely recommended to write the code without unexpected deadlocks.

@dovchinnikov
Copy link
Contributor Author

dovchinnikov commented Dec 19, 2023

I hate to bring it up here, but this is where Loom is expected to shine. Basically, the coroutine library implementation on top of Loom should spawn a new virtual thread per coroutine and use blocking JVM API calls to park/join, effectively delegating the scheduling to VM. I wonder if any work was done in this direction. This would cover IO being a black box: the VM will detect it and unmount the virtual thread before mounting another.

vsalavatov added a commit to JetBrains/intellij-deps-kotlinx.coroutines that referenced this issue Mar 26, 2024
…h a CPU permit

And reacquire CPU permit after runBlocking finishes. This should resolve Dispatchers.Default starvation in cases where runBlocking is used to run suspend functions from non-suspend execution context.

Kotlin#3983 / IJPL-721
vsalavatov added a commit to JetBrains/intellij-deps-kotlinx.coroutines that referenced this issue Mar 26, 2024
…h a CPU permit

And reacquire CPU permit after runBlocking finishes. This should resolve Dispatchers.Default starvation in cases where runBlocking is used to run suspend functions from non-suspend execution context.

Kotlin#3983 / IJPL-721
vsalavatov added a commit to JetBrains/intellij-deps-kotlinx.coroutines that referenced this issue Mar 27, 2024
vsalavatov added a commit to JetBrains/intellij-deps-kotlinx.coroutines that referenced this issue Mar 27, 2024
PermitTransfer is extracted to be used both in CoroutineScheduler and in LimitedDispatcher.
BlockingDispatchAware interface is introduced for LimitedDispatcher.Worker to be accounted by CoroutineScheduler.

Kotlin#3983 / IJPL-721
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants