Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make withTimeout throw not a CancellationException #3515

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -749,7 +749,7 @@ class ListenableFutureTest : TestBase() {
}
}
future.set(1)
withTimeout(60_000) {
kotlinx.coroutines.time.withTimeout(60_000) {
children.forEach { it.join() }
assertEquals(count, completed.get())
}
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -8,6 +8,7 @@ package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.time.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.js.*
Expand Down Expand Up @@ -260,6 +261,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
if (firstNonCancellation != null) return firstNonCancellation
val first = exceptions[0]
@Suppress("DEPRECATION")
if (first is TimeoutCancellationException) {
val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
if (detailedTimeoutException != null) return detailedTimeoutException
Expand Down
24 changes: 23 additions & 1 deletion kotlinx-coroutines-core/common/src/Timeout.kt
Expand Up @@ -2,9 +2,11 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")

package kotlinx.coroutines

import kotlin.internal.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
Expand Down Expand Up @@ -36,10 +38,17 @@ import kotlin.time.*
*
* @param timeMillis timeout time in milliseconds.
*/
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated("Use withTimeout from the 'kotlinx-coroutines-time' package instead.",
ReplaceWith("kotlinx.coroutines.time.withTimeout(timeMillis, block)",
"kotlinx.coroutines.time"),
level = DeprecationLevel.WARNING)
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
@Suppress("DEPRECATION")
if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately")
return suspendCoroutineUninterceptedOrReturn { uCont ->
setupTimeout(TimeoutCoroutine(timeMillis, uCont), block)
Expand All @@ -66,11 +75,17 @@ public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineSco
*
* > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated("Use withTimeout from the 'kotlinx-coroutines-time' package instead.",
ReplaceWith("kotlinx.coroutines.time.withTimeout(timeout, block)",
"kotlinx.coroutines.time"),
level = DeprecationLevel.WARNING)
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return withTimeout(timeout.toDelayMillis(), block)
return kotlinx.coroutines.time.withTimeout(timeout.toDelayMillis(), block)
}

/**
Expand Down Expand Up @@ -99,6 +114,7 @@ public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend Corout
if (timeMillis <= 0L) return null

var coroutine: TimeoutCoroutine<T?, T?>? = null
@Suppress("DEPRECATION")
try {
return suspendCoroutineUninterceptedOrReturn { uCont ->
val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont)
Expand Down Expand Up @@ -165,6 +181,11 @@ private class TimeoutCoroutine<U, in T: U>(
/**
* This exception is thrown by [withTimeout] to indicate timeout.
*/
@Deprecated("Use TimeoutException from the 'kotlinx-coroutines-time' package instead.",
ReplaceWith("kotlinx.coroutines.time.TimeoutException",
"kotlinx.coroutines.time"),
level = DeprecationLevel.WARNING)
@Suppress("DEPRECATION")
public class TimeoutCancellationException internal constructor(
message: String,
@JvmField @Transient internal val coroutine: Job?
Expand All @@ -181,6 +202,7 @@ public class TimeoutCancellationException internal constructor(
TimeoutCancellationException(message ?: "", coroutine).also { it.initCause(this) }
}

@Suppress("DEPRECATION")
internal fun TimeoutCancellationException(
time: Long,
coroutine: Job
Expand Down
7 changes: 4 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Expand Up @@ -11,6 +11,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.time.*
import kotlin.jvm.*
import kotlin.time.*

Expand Down Expand Up @@ -346,7 +347,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelayMillis())

/**
* Returns a flow that will emit a [TimeoutCancellationException] if the upstream doesn't emit an item within the given time.
* Returns a flow that will emit a [TimeoutException] if the upstream doesn't emit an item within the given time.
*
* Example:
*
Expand Down Expand Up @@ -386,7 +387,7 @@ public fun <T> Flow<T>.timeout(
private fun <T> Flow<T>.timeoutInternal(
timeout: Duration
): Flow<T> = scopedFlow { downStream ->
if (timeout <= Duration.ZERO) throw TimeoutCancellationException("Timed out immediately")
if (timeout <= Duration.ZERO) throw TimeoutException("Timed out immediately")
val values = buffer(Channel.RENDEZVOUS).produceIn(this)
whileSelect {
values.onReceiveCatching { value ->
Expand All @@ -398,7 +399,7 @@ private fun <T> Flow<T>.timeoutInternal(
return@onReceiveCatching true
}
onTimeout(timeout) {
throw TimeoutCancellationException("Timed out waiting for $timeout")
throw TimeoutException("Timed out waiting for $timeout")
}
}
}
20 changes: 18 additions & 2 deletions kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.intrinsics

import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.time.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

Expand All @@ -26,7 +27,7 @@ internal fun <T> (suspend () -> T).startCoroutineUnintercepted(completion: Conti
* It does not use [ContinuationInterceptor] and does not update the context of the current thread.
*/
internal fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation<T>) {
startDirect(completion) { actualCompletion ->
startDirect(completion) { actualCompletion ->
startCoroutineUninterceptedOrReturn(receiver, actualCompletion)
}
}
Expand Down Expand Up @@ -96,7 +97,22 @@ internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, blo
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturnIgnoreTimeout(
receiver: R, block: suspend R.() -> T
): Any? {
return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) {
return undispatchedResult({ e ->
@Suppress("DEPRECATION") !(e is TimeoutCancellationException && e.coroutine === this)
}) {
block.startCoroutineUninterceptedOrReturn(receiver, this)
}
}

/**
* Same as [startUndispatchedOrReturn], but ignores [TimeoutException] on fast-path.
*/
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturnIgnoreNewTimeout(
receiver: R, block: suspend R.() -> T
): Any? {
return undispatchedResult({ e ->
!(e is TimeoutException && e.coroutine === this)
}) {
block.startCoroutineUninterceptedOrReturn(receiver, this)
}
}
Expand Down
125 changes: 125 additions & 0 deletions kotlinx-coroutines-core/common/src/time/Timeout.kt
@@ -0,0 +1,125 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines.time

import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
import kotlin.time.*

/**
* Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws
* a [TimeoutException] if the timeout was exceeded.
* If the given [timeMillis] is non-positive, [TimeoutException] is thrown immediately.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* the cancellable suspending function inside the block throws a [TimeoutException].
*
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* **The timeout event is asynchronous with respect to the code running in the block** and may happen at any time,
* even right before the return from inside the timeout [block]. Keep this in mind if you open or acquire some
* resource inside the [block] that needs closing or release outside the block.
* See the
* [Asynchronous timeout and resources][https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources]
* section of the coroutines guide for details.
*
* > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*
* @param timeMillis timeout time in milliseconds.
*/
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
if (timeMillis <= 0L) throw TimeoutException("Timed out immediately")
return suspendCoroutineUninterceptedOrReturn { uCont ->
setupTimeout(TimeoutCoroutine(timeMillis, uCont), block)
}
}

/**
* Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws
* a [TimeoutException] if the timeout was exceeded.
* If the given [timeout] is non-positive, [TimeoutException] is thrown immediately.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* the cancellable suspending function inside the block throws a [TimeoutException].
*
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* **The timeout event is asynchronous with respect to the code running in the block** and may happen at any time,
* even right before the return from inside the timeout [block]. Keep this in mind if you open or acquire some
* resource inside the [block] that needs closing or release outside the block.
* See the
* [Asynchronous timeout and resources][https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources]
* section of the coroutines guide for details.
*
* > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return withTimeout(timeout.toDelayMillis(), block)
}

private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
): Any? {
// schedule cancellation of this coroutine on time
val cont = coroutine.uCont
val context = cont.context
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
// restart the block using a new coroutine with a new job,
// however, start it undispatched, because we already are in the proper context
return coroutine.startUndispatchedOrReturnIgnoreNewTimeout(coroutine, block)
}

private class TimeoutCoroutine<U, in T: U>(
@JvmField val time: Long,
uCont: Continuation<U> // unintercepted continuation
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {
override fun run() {
cancelCoroutine(TimeoutException(time, this))
}

override fun nameString(): String =
"${super.nameString()}(timeMillis=$time)"
}

/**
* This exception is thrown by [withTimeout] to indicate timeout.
*/
public class TimeoutException internal constructor(
message: String,
@JvmField @Transient internal val coroutine: Job?
): IllegalStateException(message), CopyableThrowable<TimeoutException> {
/**
* Creates a timeout exception with the given message.
* This constructor is needed for exception stack-traces recovery.
*/
@Suppress("UNUSED")
internal constructor(message: String) : this(message, null)

// message is never null in fact
override fun createCopy(): TimeoutException =
TimeoutException(message ?: "", coroutine).also { it.initCause(this) }
}

internal fun TimeoutException(
time: Long,
coroutine: Job
) : TimeoutException = TimeoutException("Timed out waiting for $time ms", coroutine)
Expand Up @@ -34,7 +34,7 @@ class BuilderContractsTest : TestBase() {
consume(wctx)

val wt: Int
withTimeout(Long.MAX_VALUE) {
kotlinx.coroutines.time.withTimeout(Long.MAX_VALUE) {
wt = 123
}
consume(wt)
Expand Down
Expand Up @@ -96,7 +96,7 @@ class CancelledParentAttachTest : TestBase() {
testScope { coroutineScope { } }
testScope { supervisorScope { } }
testScope { flowScope { } }
testScope { withTimeout(Long.MAX_VALUE) { } }
testScope { kotlinx.coroutines.time.withTimeout(Long.MAX_VALUE) { } }
testScope { withContext(Job()) { } }
testScope { withContext(CoroutineName("")) { } }
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/test/ParentCancellationTest.kt
Expand Up @@ -90,7 +90,7 @@ class ParentCancellationTest : TestBase() {
@Test
fun testWithTimeoutChild() = runTest {
testParentCancellation(expectParentActive = true, expectRethrows = true, runsInScopeContext = true) { fail ->
withTimeout(1000) { fail() }
kotlinx.coroutines.time.withTimeout(1000) { fail() }
}
}

Expand Down Expand Up @@ -168,4 +168,4 @@ class ParentCancellationTest : TestBase() {
}
finish(3)
}
}
}
Expand Up @@ -21,7 +21,7 @@ class UndispatchedResultTest : TestBase() {

@Test
fun testWithTimeout() = runTest {
invokeTest { block -> withTimeout(Long.MAX_VALUE, block) }
invokeTest { block -> kotlinx.coroutines.time.withTimeout(Long.MAX_VALUE, block) }
}

@Test
Expand Down