Skip to content

Commit

Permalink
Introduce SegmentQueueSynchronizer abstraction for synchronization …
Browse files Browse the repository at this point in the history
…primitives and `ReadWriteMutex`

Signed-off-by: Nikita Koval <ndkoval@ya.ru>
  • Loading branch information
ndkoval committed Feb 13, 2023
1 parent 43b6be5 commit b2ed1d6
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 2 deletions.
Expand Up @@ -209,6 +209,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
returnValue(value)
}

@Suppress("INFERRED_TYPE_VARIABLE_INTO_POSSIBLE_EMPTY_INTERSECTION")
internal fun suspendCancelled(): T? {
// Increment `suspendIdx` and find the segment
// with the corresponding id. It is guaranteed
Expand Down Expand Up @@ -238,14 +239,15 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
if (value !== BROKEN && segment.cas(i, value, TAKEN)) {
// The elimination is performed successfully,
// complete with the value stored in the cell.
@Suppress("UNCHECKED_CAST")
return value as T
}
// The cell is broken, this can happen only in the `SYNC` resumption mode.
assert { resumeMode == SYNC && segment.get(i) === BROKEN }
return null
}

@Suppress("UNCHECKED_CAST")
@Suppress("UNCHECKED_CAST", "INFERRED_TYPE_VARIABLE_INTO_POSSIBLE_EMPTY_INTERSECTION")
internal fun suspend(waiter: Waiter): Boolean {
// Increment `suspendIdx` and find the segment
// with the corresponding id. It is guaranteed
Expand Down Expand Up @@ -335,7 +337,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
* moves [resumeIdx] to the first possibly non-cancelled cell, i.e.,
* to the first segment id multiplied by [SEGMENT_SIZE].
*/
@Suppress("UNCHECKED_CAST")
@Suppress("UNCHECKED_CAST", "INFERRED_TYPE_VARIABLE_INTO_POSSIBLE_EMPTY_INTERSECTION")
private fun tryResumeImpl(value: T, adjustResumeIdx: Boolean): Int {
// Check that `adjustResumeIdx` is `false` in the simple cancellation mode.
assertNot { cancellationMode == SIMPLE && adjustResumeIdx }
Expand Down Expand Up @@ -561,12 +563,14 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
// provided by a concurrent `resume(..)`.
// The value could be put only in the asynchronous mode,
// so the `resume(..)` call above must not fail.
@Suppress("UNCHECKED_CAST")
resume(value as T)
} else {
// The `resume(..)` that will come to this cell should be refused.
// Mark the cell correspondingly and help a concurrent
// `resume(..)` to process its value if needed.
val value = markRefuse(index) ?: return
@Suppress("UNCHECKED_CAST")
returnRefusedValue(value as T)
}
}
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Expand Up @@ -232,6 +232,7 @@ internal open class MutexImpl(locked: Boolean) : SegmentQueueSynchronizer<Unit>(
assert { this.owner.value === NO_OWNER }
when (waiter) {
is CancellableContinuation<*> -> {
@Suppress("UNCHECKED_CAST")
waiter as CancellableContinuation<Unit>
waiter.resume(Unit, null)
}
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt
Expand Up @@ -217,6 +217,7 @@ internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
if (owner != null) error("ReadWriteMutex.write does not support owners")
writeLock()
}
@Suppress("OVERRIDE_DEPRECATION")
override val onLock: SelectClause2<Any?, Mutex> get() = error("ReadWriteMutex.write does not support `onLock`")
override fun holdsLock(owner: Any) = error("ReadWriteMutex.write does not support owners")
override fun unlock(owner: Any?) {
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/test/sync/MutexTest.kt
Expand Up @@ -140,6 +140,7 @@ class MutexTest : TestBase() {
}

@Test
@Suppress("DEPRECATION")
fun testIllegalStateInvariant() = runTest {
val mutex = Mutex()
val owner = Any()
Expand Down
8 changes: 8 additions & 0 deletions kotlinx-coroutines-core/jvm/test/TestBase.kt
Expand Up @@ -255,6 +255,14 @@ public actual open class TestBase(private var disableOutCheck: Boolean) {
protected suspend fun currentDispatcher() = coroutineContext[ContinuationInterceptor]!!
}

fun <T> CancellableContinuation<T>.tryResume0(value: T, onCancellation: (Throwable?) -> Unit): Boolean {
tryResume(value, null, onCancellation).let {
if (it == null) return false
completeResume(it)
return true
}
}

/*
* We ignore tests that test **real** non-virtualized tests with time on Windows, because
* our CI Windows is virtualized itself (oh, the irony) and its clock resolution is dozens of ms,
Expand Down
Expand Up @@ -25,6 +25,7 @@ class MutexLincheckTest : AbstractLincheckTest() {

// TODO: `onLock` with non-null owner is non-linearizable
// onLock may suspend in case of clause re-registration.
@Suppress("DEPRECATION")
@Operation(allowExtraSuspension = true, promptCancellation = true)
suspend fun onLock() = select<Unit> { mutex.onLock(null) {} }

Expand Down

0 comments on commit b2ed1d6

Please sign in to comment.