From 836a4bb7b977733fd47908c9f75f79ec9c6c2ea0 Mon Sep 17 00:00:00 2001 From: Yehor Kulbachka <38127655+EgorKulbachka@users.noreply.github.com> Date: Fri, 8 Jul 2022 13:38:30 +0200 Subject: [PATCH] Comply with Subscriber rule 2.7 in the `await*` impl (#3360) There is a possibility of a race between Subscription.request and Subscription.cancel methods since cancellation handler could be executed in a separate thread. Rule [2.7](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code) requires Subscription methods to be executed serially. --- .../kotlinx-coroutines-reactive/src/Await.kt | 30 ++++++++++--- .../test/AwaitCancellationStressTest.kt | 43 +++++++++++++++++++ 2 files changed, 68 insertions(+), 5 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index da8632bffc..3d9a0f8567 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -198,12 +198,20 @@ private suspend fun Publisher.awaitOne( /** cancelling the new subscription due to rule 2.5, though the publisher would either have to * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */ if (subscription != null) { - sub.cancel() + withSubscriptionLock { + sub.cancel() + } return } subscription = sub - cont.invokeOnCancellation { sub.cancel() } - sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) + cont.invokeOnCancellation { + withSubscriptionLock { + sub.cancel() + } + } + withSubscriptionLock { + sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) + } } override fun onNext(t: T) { @@ -228,12 +236,16 @@ private suspend fun Publisher.awaitOne( return } seenValue = true - sub.cancel() + withSubscriptionLock { + sub.cancel() + } cont.resume(t) } Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> { if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) { - sub.cancel() + withSubscriptionLock { + sub.cancel() + } /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or `onError` on its own. */ if (cont.isActive) { @@ -289,6 +301,14 @@ private suspend fun Publisher.awaitOne( inTerminalState = true return true } + + /** + * Enforce rule 2.7: [Subscription.request] and [Subscription.cancel] must be executed serially + */ + @Synchronized + private fun withSubscriptionLock(block: () -> Unit) { + block() + } }) } diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt new file mode 100644 index 0000000000..aaf8df6e69 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import org.junit.* +import org.reactivestreams.* +import java.util.concurrent.locks.* + +/** + * This test checks implementation of rule 2.7 for await methods - serial execution of subscription methods + */ +class AwaitCancellationStressTest : TestBase() { + private val iterations = 10_000 * stressTestMultiplier + + @Test + fun testAwaitCancellationOrder() = runTest { + repeat(iterations) { + val job = launch(Dispatchers.Default) { + testPublisher().awaitFirst() + } + job.cancelAndJoin() + } + } + + private fun testPublisher() = Publisher { s -> + val lock = ReentrantLock() + s.onSubscribe(object : Subscription { + override fun request(n: Long) { + check(lock.tryLock()) + s.onNext(42) + lock.unlock() + } + + override fun cancel() { + check(lock.tryLock()) + lock.unlock() + } + }) + } +}