Skip to content

Commit

Permalink
Add optional name parameter to .limitedParallelism
Browse files Browse the repository at this point in the history
Fixes #4023
  • Loading branch information
qwwdfsad committed Apr 26, 2024
1 parent dd890c9 commit 547fe3a
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 23 deletions.
6 changes: 4 additions & 2 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -170,7 +170,9 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public synthetic fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
public static synthetic fun limitedParallelism$default (Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/String;ILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
Expand Down Expand Up @@ -502,7 +504,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
public fun <init> ()V
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun toString ()Ljava/lang/String;
protected final fun toStringInternalImpl ()Ljava/lang/String;
}
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
Expand Up @@ -59,14 +59,15 @@ abstract class kotlinx.coroutines/CoroutineDispatcher : kotlin.coroutines/Abstra
open fun dispatchYield(kotlin.coroutines/CoroutineContext, kotlinx.coroutines/Runnable) // kotlinx.coroutines/CoroutineDispatcher.dispatchYield|dispatchYield(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}[0]
open fun isDispatchNeeded(kotlin.coroutines/CoroutineContext): kotlin/Boolean // kotlinx.coroutines/CoroutineDispatcher.isDispatchNeeded|isDispatchNeeded(kotlin.coroutines.CoroutineContext){}[0]
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
open fun limitedParallelism(kotlin/Int, kotlin/String? =...): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/CoroutineDispatcher.toString|toString(){}[0]
}
abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/CoroutineDispatcher { // kotlinx.coroutines/MainCoroutineDispatcher|null[0]
abstract val immediate // kotlinx.coroutines/MainCoroutineDispatcher.immediate|{}immediate[0]
abstract fun <get-immediate>(): kotlinx.coroutines/MainCoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.immediate.<get-immediate>|<get-immediate>(){}[0]
constructor <init>() // kotlinx.coroutines/MainCoroutineDispatcher.<init>|<init>(){}[0]
final fun toStringInternalImpl(): kotlin/String? // kotlinx.coroutines/MainCoroutineDispatcher.toStringInternalImpl|toStringInternalImpl(){}[0]
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
open fun limitedParallelism(kotlin/Int, kotlin/String?): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/MainCoroutineDispatcher.toString|toString(){}[0]
}
abstract fun interface <#A: in kotlin/Any?> kotlinx.coroutines.flow/FlowCollector { // kotlinx.coroutines.flow/FlowCollector|null[0]
Expand Down
14 changes: 11 additions & 3 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Expand Up @@ -105,7 +105,7 @@ public abstract class CoroutineDispatcher :
* is established between them:
*
* ```
* val confined = Dispatchers.Default.limitedParallelism(1)
* val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher")
* var counter = 0
*
* // Invoked from arbitrary coroutines
Expand Down Expand Up @@ -135,15 +135,23 @@ public abstract class CoroutineDispatcher :
* Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
* For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
*
* @param name optional name for the resulting dispatcher string representation if a new dispatcher was created
* @throws IllegalArgumentException if the given [parallelism] is non-positive
* @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
*/
@ExperimentalCoroutinesApi
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher {
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism)
return LimitedDispatcher(this, parallelism, name)
}

// Was experimental since 1.6.0, deprecated since 1.8.x
@Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead",
level = DeprecationLevel.HIDDEN,
replaceWith = ReplaceWith("limitedParallelism(parallelism, null)")
)
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null)

/**
* Requests execution of a runnable [block].
* The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool,
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -111,7 +111,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}
Expand Down
Expand Up @@ -49,7 +49,7 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
*/
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
return this
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Unconfined.kt
Expand Up @@ -9,7 +9,7 @@ import kotlin.jvm.*
internal object Unconfined : CoroutineDispatcher() {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
}

Expand Down
Expand Up @@ -21,7 +21,8 @@ import kotlin.coroutines.*
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
private val parallelism: Int,
private val name: String?
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {

// Atomic is necessary here for the sake of K/N memory ordering,
Expand All @@ -34,10 +35,10 @@ internal class LimitedDispatcher(
private val workerAllocationLock = SynchronizedObject()

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return this
return super.limitedParallelism(parallelism)
return super.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down Expand Up @@ -95,7 +96,7 @@ internal class LimitedDispatcher(
}
}

override fun toString() = "$dispatcher.limitedParallelism($parallelism)"
override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)"

/**
* A worker that polls the queue and runs tasks until there are no more of them.
Expand Down
Expand Up @@ -30,7 +30,7 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay

abstract fun scheduleQueueProcessing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}
Expand Down
Expand Up @@ -91,7 +91,7 @@ private class MissingMainCoroutineDispatcher(
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
missing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher =
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
missing()

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
Expand Down
12 changes: 6 additions & 6 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Expand Up @@ -12,10 +12,10 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher(
) {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= CORE_POOL_SIZE) return this
return super.limitedParallelism(parallelism)
return super.limitedParallelism(parallelism, name)
}

// Shuts down the dispatcher, used only by Dispatchers.shutdown()
Expand Down Expand Up @@ -44,10 +44,10 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
}

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= MAX_POOL_SIZE) return this
return super.limitedParallelism(parallelism)
return super.limitedParallelism(parallelism, name)
}

// This name only leaks to user code as part of .limitedParallelism machinery
Expand All @@ -72,9 +72,9 @@ internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return UnlimitedIoScheduler.limitedParallelism(parallelism)
return UnlimitedIoScheduler.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt
Expand Up @@ -18,5 +18,10 @@ class DispatchersToStringTest {
assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString())
// Not overridden at all, limited parallelism returns `this`
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())

assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString())
assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString())
// Not overridden at all, limited parallelism returns `this`
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "ignored").toString())
}
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/native/src/Dispatchers.kt
Expand Up @@ -28,9 +28,9 @@ internal object DefaultIoScheduler : CoroutineDispatcher() {
private val io = unlimitedPool.limitedParallelism(64) // Default JVM size

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return unlimitedPool.limitedParallelism(parallelism)
return unlimitedPool.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down

0 comments on commit 547fe3a

Please sign in to comment.