Skip to content

Commit

Permalink
IntroduceSegmentQueueSynchronizer abstraction for synchronization p…
Browse files Browse the repository at this point in the history
…rimitives and `ReadWriteMutex`
  • Loading branch information
ndkoval committed Mar 23, 2021
1 parent 7c6c72f commit b330b86
Show file tree
Hide file tree
Showing 16 changed files with 2,765 additions and 210 deletions.
1 change: 1 addition & 0 deletions .idea/dictionaries/shared.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
Expand Up @@ -14,8 +14,8 @@ import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit

@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Warmup(iterations = 2, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 5, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
Expand All @@ -31,7 +31,6 @@ open class SemaphoreBenchmark {
private var _3_maxPermits: Int = 0

@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
private var _4_parallelism: Int = 0

Expand All @@ -51,7 +50,7 @@ open class SemaphoreBenchmark {
val semaphore = Semaphore(_3_maxPermits)
val jobs = ArrayList<Job>(coroutines)
repeat(coroutines) {
jobs += GlobalScope.launch {
jobs += GlobalScope.launch(dispatcher) {
repeat(n) {
semaphore.withPermit {
doGeomDistrWork(WORK_INSIDE)
Expand All @@ -69,7 +68,7 @@ open class SemaphoreBenchmark {
val semaphore = Channel<Unit>(_3_maxPermits)
val jobs = ArrayList<Job>(coroutines)
repeat(coroutines) {
jobs += GlobalScope.launch {
jobs += GlobalScope.launch(dispatcher) {
repeat(n) {
semaphore.send(Unit) // acquire
doGeomDistrWork(WORK_INSIDE)
Expand All @@ -89,4 +88,4 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor

private const val WORK_INSIDE = 80
private const val WORK_OUTSIDE = 40
private const val BATCH_SIZE = 1000000
private const val BATCH_SIZE = 1_000_000
2 changes: 1 addition & 1 deletion gradle.properties
Expand Up @@ -12,7 +12,7 @@ junit_version=4.12
atomicfu_version=0.15.2
knit_version=0.2.3
html_version=0.6.8
lincheck_version=2.10
lincheck_version=2.11
dokka_version=0.9.16-rdev-2-mpp-hacks
byte_buddy_version=1.10.9
reactor_version=3.4.1
Expand Down
12 changes: 12 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -1281,6 +1281,18 @@ public final class kotlinx/coroutines/sync/MutexKt {
public static synthetic fun withLock$default (Lkotlinx/coroutines/sync/Mutex;Ljava/lang/Object;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/sync/ReadWriteMutex {
public abstract fun getWrite ()Lkotlinx/coroutines/sync/Mutex;
public abstract fun readLock (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun readUnlock ()V
}

public final class kotlinx/coroutines/sync/ReadWriteMutexKt {
public static final fun ReadWriteMutex ()Lkotlinx/coroutines/sync/ReadWriteMutex;
public static final fun read (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun write (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/sync/Semaphore {
public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getAvailablePermits ()I
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/build.gradle
Expand Up @@ -242,8 +242,8 @@ static void configureJvmForLincheck(task) {
task.maxHeapSize = '6g' // we may need more space for building an interleaving tree in the model checking mode
task.jvmArgs = ['--add-opens', 'java.base/jdk.internal.misc=ALL-UNNAMED', // required for transformation
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
task.systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for the model checking mode
task.systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2'
task.systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '1' // better for the model checking mode
}

task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
Expand Down
23 changes: 13 additions & 10 deletions kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt
Expand Up @@ -149,11 +149,13 @@ internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>
*/
fun remove() {
assert { removed } // The node should be logically removed at first.
assert { !isTail } // The physical tail cannot be removed.
// The physical tail cannot be removed. Instead, we remove it when
// a new segment is added and this segment is not the tail one anymore.
if (isTail) return
while (true) {
// Read `next` and `prev` pointers ignoring logically removed nodes.
val prev = leftmostAliveNode
val next = rightmostAliveNode
val prev = aliveSegmentLeft
val next = aliveSegmentRight
// Link `next` and `prev`.
next._prev.value = prev
if (prev !== null) prev._next.value = next
Expand All @@ -165,17 +167,17 @@ internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>
}
}

private val leftmostAliveNode: N? get() {
private val aliveSegmentLeft: N? get() {
var cur = prev
while (cur !== null && cur.removed)
cur = cur._prev.value
return cur
}

private val rightmostAliveNode: N get() {
private val aliveSegmentRight: N get() {
assert { !isTail } // Should not be invoked on the tail node
var cur = next!!
while (cur.removed)
while (cur.removed && !cur.isTail)
cur = cur.next!!
return cur
}
Expand Down Expand Up @@ -203,19 +205,20 @@ internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers
* There are no pointers to this segment from outside, and
* it is not a physical tail in the linked list of segments.
*/
override val removed get() = cleanedAndPointers.value == maxSlots && !isTail
override val removed get() = cleanedAndPointers.value == maxSlots

// increments the number of pointers if this segment is not logically removed.
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail }
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots }

// returns `true` if this segment is logically removed after the decrement.
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots

/**
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
*/
fun onSlotCleaned() {
if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove()
if (cleanedAndPointers.incrementAndGet() < maxSlots) return
if (removed) remove()
}
}

Expand Down

0 comments on commit b330b86

Please sign in to comment.