From 5f63dc9f09fe75753db55a0c1ebd04ae7c36b930 Mon Sep 17 00:00:00 2001 From: Yehor Kulbachka Date: Thu, 7 Jul 2022 10:57:44 +0200 Subject: [PATCH 1/6] Await comply with reactive streams rule 2.7 There is a possibility of race of Subscription.request and Subscription.cancel methods since cancellation handler could be executed in a separate thread. Rule [2.7](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code) requires Subscription methods to be executed serially. --- .../kotlinx-coroutines-reactive/src/Await.kt | 3 +- .../test/AwaitTest.kt | 39 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index da8632bffc..762043eaa7 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -202,8 +202,9 @@ private suspend fun Publisher.awaitOne( return } subscription = sub - cont.invokeOnCancellation { sub.cancel() } sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) + // Due to rule 2.7 ensuring that Subscription request is finished before registering cancellation handler + cont.invokeOnCancellation { sub.cancel() } } override fun onNext(t: T) { diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt index 6749423f80..b098817969 100644 --- a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.sync.* import org.junit.* import org.reactivestreams.* @@ -40,4 +41,40 @@ class AwaitTest: TestBase() { finish(7) } -} \ No newline at end of file + @Test + fun testAwaitCancellationPerformedSerially() = runTest { + val requestCompletion = Mutex(locked = true) + val subscriptionStarted = Mutex(locked = true) + expect(1) + val publisher = Publisher { s -> + s.onSubscribe(object : Subscription { + override fun request(n: Long) { + expect(2) + subscriptionStarted.unlock() + runBlocking { requestCompletion.lock() } + expect(4) + } + + override fun cancel() { + expect(5) + } + }) + } + val job = launch(Dispatchers.IO) { + try { + publisher.awaitFirst() + } catch (e: CancellationException) { + expect(6) + throw e + } + } + subscriptionStarted.lock() + expect(3) + + job.cancel() + requestCompletion.unlock() + job.join() + + finish(7) + } +} From 4db21317513f173c8de29e982aa765c8e5575af4 Mon Sep 17 00:00:00 2001 From: Yehor Kulbachka Date: Thu, 7 Jul 2022 16:48:01 +0200 Subject: [PATCH 2/6] Use synchronized block to ensure serial execution of subscription methods --- .../kotlinx-coroutines-reactive/src/Await.kt | 33 +++++++++++++++---- .../test/AwaitTest.kt | 7 ++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index 762043eaa7..8b1764ab36 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -198,13 +198,22 @@ private suspend fun Publisher.awaitOne( /** cancelling the new subscription due to rule 2.5, though the publisher would either have to * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */ if (subscription != null) { - sub.cancel() + withSubscriptionLock { + sub.cancel() + } return } subscription = sub - sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) - // Due to rule 2.7 ensuring that Subscription request is finished before registering cancellation handler - cont.invokeOnCancellation { sub.cancel() } + + cont.invokeOnCancellation { + withSubscriptionLock { + sub.cancel() + } + } + + withSubscriptionLock { + sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) + } } override fun onNext(t: T) { @@ -229,12 +238,16 @@ private suspend fun Publisher.awaitOne( return } seenValue = true - sub.cancel() + withSubscriptionLock { + sub.cancel() + } cont.resume(t) } Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> { if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) { - sub.cancel() + withSubscriptionLock { + sub.cancel() + } /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or `onError` on its own. */ if (cont.isActive) { @@ -290,6 +303,14 @@ private suspend fun Publisher.awaitOne( inTerminalState = true return true } + + /** + * Enforce rule 2.7: [Subscription.request] and [Subscription.cancel] must be executed serially + */ + @Synchronized + private fun withSubscriptionLock(block: () -> Unit) { + block() + } }) } diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt index b098817969..b41f32c79f 100644 --- a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt @@ -60,7 +60,7 @@ class AwaitTest: TestBase() { } }) } - val job = launch(Dispatchers.IO) { + val job = launch(Dispatchers.Default) { try { publisher.awaitFirst() } catch (e: CancellationException) { @@ -71,7 +71,10 @@ class AwaitTest: TestBase() { subscriptionStarted.lock() expect(3) - job.cancel() + launch(Dispatchers.Default) { + job.cancel() + } + delay(10) requestCompletion.unlock() job.join() From 90045c42e5103c45a44ee270a4156c29c44b1719 Mon Sep 17 00:00:00 2001 From: Yehor Kulbachka Date: Thu, 7 Jul 2022 18:50:44 +0200 Subject: [PATCH 3/6] Stress test for await cancel race condition --- .../kotlinx-coroutines-reactive/src/Await.kt | 2 - .../test/AwaitCancellationStressTest.kt | 44 +++++++++++++++++++ .../test/AwaitTest.kt | 41 ----------------- 3 files changed, 44 insertions(+), 43 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index 8b1764ab36..3d9a0f8567 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -204,13 +204,11 @@ private suspend fun Publisher.awaitOne( return } subscription = sub - cont.invokeOnCancellation { withSubscriptionLock { sub.cancel() } } - withSubscriptionLock { sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) } diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt new file mode 100644 index 0000000000..1318f441fb --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import org.junit.* +import org.reactivestreams.* +import java.util.concurrent.atomic.* + +/** + * This test checks implementation of rule 2.7 for await methods - serial execution of subscription methods + */ +class AwaitCancellationStressTest : TestBase() { + private val jobsToRun = 1000 * stressTestMultiplier + + @Test + fun testRequestStress() = runTest { + val jobs = (1..jobsToRun).map { + launch(Dispatchers.Default) { + testPublisher().awaitFirst() + } + } + jobs.forEach { it.cancel() } + jobs.joinAll() + } + + private fun testPublisher() = Publisher { s -> + val counter = AtomicInteger() + s.onSubscribe(object : Subscription { + override fun request(n: Long) { + check(counter.getAndIncrement() == 0) + Thread.sleep(10) + counter.decrementAndGet() + } + + override fun cancel() { + check(counter.getAndIncrement() == 0) + counter.decrementAndGet() + } + }) + } +} diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt index b41f32c79f..a4d5c6423c 100644 --- a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt @@ -5,7 +5,6 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* -import kotlinx.coroutines.sync.* import org.junit.* import org.reactivestreams.* @@ -40,44 +39,4 @@ class AwaitTest: TestBase() { job.cancelAndJoin() finish(7) } - - @Test - fun testAwaitCancellationPerformedSerially() = runTest { - val requestCompletion = Mutex(locked = true) - val subscriptionStarted = Mutex(locked = true) - expect(1) - val publisher = Publisher { s -> - s.onSubscribe(object : Subscription { - override fun request(n: Long) { - expect(2) - subscriptionStarted.unlock() - runBlocking { requestCompletion.lock() } - expect(4) - } - - override fun cancel() { - expect(5) - } - }) - } - val job = launch(Dispatchers.Default) { - try { - publisher.awaitFirst() - } catch (e: CancellationException) { - expect(6) - throw e - } - } - subscriptionStarted.lock() - expect(3) - - launch(Dispatchers.Default) { - job.cancel() - } - delay(10) - requestCompletion.unlock() - job.join() - - finish(7) - } } From 179bf3f8d7272830de2bb1cec5f13b3f1ed6972f Mon Sep 17 00:00:00 2001 From: Yehor Kulbachka Date: Thu, 7 Jul 2022 19:21:06 +0200 Subject: [PATCH 4/6] Revert await test changes --- reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt index a4d5c6423c..6749423f80 100644 --- a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt @@ -39,4 +39,5 @@ class AwaitTest: TestBase() { job.cancelAndJoin() finish(7) } -} + +} \ No newline at end of file From e7a342b3094a3e5ca1d8a4cfa639ff8280cb79a9 Mon Sep 17 00:00:00 2001 From: Yehor Kulbachka Date: Thu, 7 Jul 2022 21:38:10 +0200 Subject: [PATCH 5/6] Rename test method --- .../test/AwaitCancellationStressTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt index 1318f441fb..9ba5a7a435 100644 --- a/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt @@ -16,7 +16,7 @@ class AwaitCancellationStressTest : TestBase() { private val jobsToRun = 1000 * stressTestMultiplier @Test - fun testRequestStress() = runTest { + fun testAwaitCancellationOrder() = runTest { val jobs = (1..jobsToRun).map { launch(Dispatchers.Default) { testPublisher().awaitFirst() From 947adb878b6b7aaf75209d480d5576ac7f8cc2b9 Mon Sep 17 00:00:00 2001 From: Yehor Kulbachka Date: Fri, 8 Jul 2022 12:07:58 +0200 Subject: [PATCH 6/6] Rewrite stress test to sequential iterations and including onNext --- .../test/AwaitCancellationStressTest.kt | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt index 9ba5a7a435..aaf8df6e69 100644 --- a/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitCancellationStressTest.kt @@ -7,37 +7,36 @@ package kotlinx.coroutines.reactive import kotlinx.coroutines.* import org.junit.* import org.reactivestreams.* -import java.util.concurrent.atomic.* +import java.util.concurrent.locks.* /** * This test checks implementation of rule 2.7 for await methods - serial execution of subscription methods */ class AwaitCancellationStressTest : TestBase() { - private val jobsToRun = 1000 * stressTestMultiplier + private val iterations = 10_000 * stressTestMultiplier @Test fun testAwaitCancellationOrder() = runTest { - val jobs = (1..jobsToRun).map { - launch(Dispatchers.Default) { + repeat(iterations) { + val job = launch(Dispatchers.Default) { testPublisher().awaitFirst() } + job.cancelAndJoin() } - jobs.forEach { it.cancel() } - jobs.joinAll() } private fun testPublisher() = Publisher { s -> - val counter = AtomicInteger() + val lock = ReentrantLock() s.onSubscribe(object : Subscription { override fun request(n: Long) { - check(counter.getAndIncrement() == 0) - Thread.sleep(10) - counter.decrementAndGet() + check(lock.tryLock()) + s.onNext(42) + lock.unlock() } override fun cancel() { - check(counter.getAndIncrement() == 0) - counter.decrementAndGet() + check(lock.tryLock()) + lock.unlock() } }) }