Skip to content

Commit

Permalink
Remove DCSS
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Feb 28, 2024
1 parent 0300a4b commit de0b797
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 208 deletions.
41 changes: 28 additions & 13 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -160,12 +160,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
* and should be [unboxed][unboxState] before returning to user code.
*/
internal val state: Any? get() {
_state.loop { state -> // helper loop on state (complete in-progress atomic operations)
if (state !is OpDescriptor) return state
state.perform(this)
}
}
internal val state: Any? get() = _state.value

/**
* @suppress **This is unstable API and it is subject to change.**
Expand Down Expand Up @@ -325,6 +320,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
list.closeForSome()
notifyHandlers<JobCancellingNode>(list, cause)
// then cancel parent
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
Expand Down Expand Up @@ -356,8 +352,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return parent.childCancelled(cause) || isCancellation
}

private fun NodeList.notifyCompletion(cause: Throwable?) =
private fun NodeList.notifyCompletion(cause: Throwable?) {
close()
notifyHandlers<JobNode>(this, cause)
}

private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
Expand Down Expand Up @@ -488,7 +486,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler is ChildHandleNode && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
if (!list.addLast(
node,
allowedAfterPartialClosing = handler is ChildHandleNode
)
) return@loopOnState // retry
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
// otherwise handler is invoked immediately out of the synchronized section & handle returned
Expand All @@ -500,8 +502,24 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
if (invokeImmediately) handler.invoke(rootCause)
return handle
} else {
if (addLastAtomic(state, list, node)) return node
} else if (list.addLast(
node, allowedAfterPartialClosing = !onCancelling || handler is ChildHandleNode
)) {
if (handler is ChildHandleNode) {
/** Handling the following case:
* - A child requested to be added to the list;
* - We checked the state and saw that it wasn't `Finishing`;
* - Then, the job got cancelled and notified everyone about it;
* - Only then did we add the child to the list
* - and ended up here.
*/
val latestState = this@JobSupport.state
if (latestState is Finishing) {
// Assumption: children always have `invokeImmediately = true`.
synchronized(latestState) { latestState.rootCause }?.let { handler.invoke(it) }
}
}
return node
}
}
}
Expand All @@ -528,9 +546,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return node
}

private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
list.addLastIf(node) { this.state === expect }

private fun promoteEmptyToNodeList(state: Empty) {
// try to promote it to LIST state with the corresponding state
val list = NodeList()
Expand Down
67 changes: 0 additions & 67 deletions kotlinx-coroutines-core/common/src/internal/Atomic.kt

This file was deleted.

Expand Up @@ -2,18 +2,16 @@

package kotlinx.coroutines.internal

import kotlinx.coroutines.*
import kotlin.jvm.*

/** @suppress **This is unstable API and it is subject to change.** */
public expect open class LockFreeLinkedListNode() {
public val isRemoved: Boolean
public val nextNode: LockFreeLinkedListNode
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addLast(node: LockFreeLinkedListNode, allowedAfterPartialClosing: Boolean): Boolean
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean
public open fun remove(): Boolean
public fun close()
public fun closeForSome()

}

Expand Down
14 changes: 14 additions & 0 deletions kotlinx-coroutines-core/common/test/JobTest.kt
Expand Up @@ -174,6 +174,20 @@ class JobTest : TestBase() {
finish(4)
}

@Test
fun testInvokeOnCancellingFiringOnNormalExit() = runTest {
val job = launch {
expect(2)
}
job.invokeOnCompletion(onCancelling = true) {
assertNull(it)
expect(3)
}
expect(1)
job.join()
finish(4)
}

@Test
fun testOverriddenParent() = runTest {
val parent = Job()
Expand Down
Expand Up @@ -8,18 +8,6 @@ import kotlin.jvm.*

private typealias Node = LockFreeLinkedListNode

@PublishedApi
internal const val UNDECIDED: Int = 0

@PublishedApi
internal const val SUCCESS: Int = 1

@PublishedApi
internal const val FAILURE: Int = 2

@PublishedApi
internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE")

/**
* Doubly-linked concurrent list node with remove support.
* Based on paper
Expand Down Expand Up @@ -49,37 +37,10 @@ public actual open class LockFreeLinkedListNode {
private fun removed(): Removed =
_removedRef.value ?: Removed(this).also { _removedRef.lazySet(it) }

@PublishedApi
internal abstract class CondAddOp(
@JvmField val newNode: Node
) : AtomicOp<Node>() {
@JvmField var oldNext: Node? = null

override fun complete(affected: Node, failure: Any?) {
val success = failure == null
val update = if (success) newNode else oldNext
if (update != null && affected._next.compareAndSet( this, update)) {
// only the thread the makes this update actually finishes add operation
if (success) newNode.finishAdd(oldNext!!)
}
}
}

@PublishedApi
internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp =
object : CondAddOp(node) {
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
}

public actual open val isRemoved: Boolean get() = next is Removed

// LINEARIZABLE. Returns Node | Removed
public val next: Any get() {
_next.loop { next ->
if (next !is OpDescriptor) return next
next.perform(this)
}
}
public val next: Any get() = _next.value

// LINEARIZABLE. Returns next non-removed Node
public actual val nextNode: Node get() =
Expand Down Expand Up @@ -117,29 +78,30 @@ public actual open class LockFreeLinkedListNode {
// ------ addLastXXX ------

/**
* Adds last item to this list.
* Adds last item to this list. Returns `false` if the list is closed.
*/
public actual fun addLast(node: Node) {
public actual fun addLast(node: Node, allowedAfterPartialClosing: Boolean): Boolean {
while (true) { // lock-free loop on prev.next
if (prevNode.addNext(node, this)) return
val currentPrev = prevNode
return when {
currentPrev is LIST_CLOSED_FOR_ALL -> false
currentPrev is LIST_CLOSED_FOR_SOME ->
allowedAfterPartialClosing && currentPrev.addLast(node, allowedAfterPartialClosing)
currentPrev.addNext(node, this) -> true
else -> continue
}
}
}

/**
* Adds last item to this list atomically if the [condition] is true.
* Forbids adding some of the new items to this list.
*/
public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
when (prev.tryCondAddNext(node, this, condAdd)) {
SUCCESS -> return true
FAILURE -> return false
}
}
}
public actual fun closeForSome() { addLast(LIST_CLOSED_FOR_SOME(), allowedAfterPartialClosing = false) }

// ------ addXXX util ------
/**
* Forbids adding new items to this list.
*/
public actual fun close() { addLast(LIST_CLOSED_FOR_ALL(), allowedAfterPartialClosing = true) }

/**
* Given:
Expand Down Expand Up @@ -174,17 +136,6 @@ public actual open class LockFreeLinkedListNode {
return true
}

// returns UNDECIDED, SUCCESS or FAILURE
@PublishedApi
internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAddOp): Int {
node._prev.lazySet(this)
node._next.lazySet(next)
condAdd.oldNext = next
if (!_next.compareAndSet(next, condAdd)) return UNDECIDED
// added operation successfully (linearized) -- complete it & fixup the list
return if (condAdd.perform(this) == null) SUCCESS else FAILURE
}

// ------ removeXXX ------

/**
Expand Down Expand Up @@ -284,10 +235,6 @@ public actual open class LockFreeLinkedListNode {
}
// slow path when we need to help remove operations
this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time
prevNext is OpDescriptor -> { // help & retry
prevNext.perform(prev)
return correctPrev() // retry from scratch
}
prevNext is Removed -> {
if (last !== null) {
// newly added (prev) node is already removed, correct last.next around it
Expand Down Expand Up @@ -347,3 +294,7 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() {

override fun nextIfRemoved(): Node? = null
}

private class LIST_CLOSED_FOR_SOME: LockFreeLinkedListNode()

private class LIST_CLOSED_FOR_ALL: LockFreeLinkedListNode()

0 comments on commit de0b797

Please sign in to comment.