Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the implementation of the linked list for JobSupport #4095

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 61 additions & 71 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -7,6 +7,7 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.experimental.*
import kotlin.js.*
import kotlin.jvm.*

Expand Down Expand Up @@ -319,8 +320,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
list.close(LIST_CANCELLATION_PERMISSION)
notifyHandlers(list, cause) { it.onCancelling }
notifyHandlers(list, LIST_CANCELLATION_PERMISSION, cause) { it.onCancelling }
// then cancel parent
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
Expand Down Expand Up @@ -352,13 +352,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

private fun NodeList.notifyCompletion(cause: Throwable?) {
close(LIST_ON_COMPLETION_PERMISSION)
notifyHandlers(this, cause) { true }
notifyHandlers(this, LIST_ON_COMPLETION_PERMISSION, cause) { true }
}

private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) {
private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) {
var exception: Throwable? = null
list.forEach { node ->
list.forEach(forbidBitmask = permissionBitmask) { node, _, _ ->
if (node is JobNode && predicate(node)) {
try {
node.invoke(cause)
Expand Down Expand Up @@ -559,10 +558,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren

private fun promoteSingleToNodeList(state: JobNode) {
// try to promote it to list (SINGLE+ state)
state.addOneIfEmpty(NodeList())
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
val list = state.nextNode // either our NodeList or somebody else won the race, updated state
// just attempt converting it to list if state is still the same, then we'll continue lock-free loop
val list = NodeList()
val address = list.addLastWithoutModifying(state, permissionsBitmask = 0)
assert { address == 0L }
_state.compareAndSet(state, list)
}

Expand Down Expand Up @@ -626,7 +624,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
is Incomplete -> { // may have a list of completion handlers
// remove node from the list if there is a list
if (state.list != null) node.remove()
state.list?.remove(node)
return
}
else -> return // it is complete and does not have any completion handlers
Expand Down Expand Up @@ -929,79 +927,70 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!)
notifyRootCause?.let { notifyCancelling(list, it) }
// now wait for children
// we can't close the list yet: while there are active children, adding new ones is still allowed.
val child = list.nextChild()
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
return COMPLETING_WAITING_CHILDREN
// turns out, there are no children to await, so we close the list.
list.close(LIST_CHILD_PERMISSION)
// some children could have sneaked into the list, so we try waiting for them again.
// it would be more correct to re-open the list (otherwise, we get non-linearizable behavior),
// but it's too difficult with the current lock-free list implementation.
val anotherChild = list.nextChild()
if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate))
return COMPLETING_WAITING_CHILDREN
if (shouldWaitForChildren(finishing, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
// otherwise -- we have not children left (all were already cancelled?)
return finalizeFinishingState(finishing, proposedUpdate)
}

private val Any?.exceptionOrNull: Throwable?
get() = (this as? CompletedExceptionally)?.cause

// return false when there is no more incomplete children to wait
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, child, proposedUpdate)
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
val nextChild = child.nextChild() ?: return false
return tryWaitForChild(state, nextChild, proposedUpdate)
private fun shouldWaitForChildren(
state: Finishing,
proposedUpdate: Any?,
suggestedStartSegment: LockFreeLinkedListSegment? = null,
suggestedStartIndex: Int? = null
): Boolean {
val list = state.list
fun tryFindChildren(
closeList: Boolean,
suggestedStartSegment: LockFreeLinkedListSegment? = null,
suggestedStartIndex: Int? = null,
): Boolean {
var startSegment = suggestedStartSegment
var startIndex = suggestedStartIndex
while (true) {
val child = run {
list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startInSegment = startSegment, startAfterIndex = startIndex) { node, segment, indexInSegment ->
if (node is ChildHandleNode) {
startSegment = segment
startIndex = indexInSegment
return@run node
}
}
null
} ?: break
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, startSegment!!, startIndex!!, proposedUpdate)
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
}
return false
}
// Look for children that are currently in the list after the suggested start node.
if (tryFindChildren(suggestedStartSegment = suggestedStartSegment, suggestedStartIndex = suggestedStartIndex, closeList = false)) return true
// We didn't find anyone in the list after the suggested start node. Let's check the beginning now.
if (suggestedStartSegment != null && tryFindChildren(closeList = false)) return true
// Now we know that, at the moment this function started, there were no more children.
// We can close the list for the new children, and if we still don't find any, we can be sure there are none.
return tryFindChildren(closeList = true)
}

// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
private fun continueCompleting(state: Finishing, proposedUpdate: Any?, lastSegment: LockFreeLinkedListSegment, lastIndexInSegment: Int) {
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
// figure out if we need to wait for the next child
val waitChild = lastChild.nextChild()
// try to wait for the next child
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
// no more children to await, so *maybe* we can complete the job; for that, we stop accepting new children.
// potentially, the list can be closed for children more than once: if we detect that there are no more
// children, attempt to close the list, and then new children sneak in, this whole logic will be
// repeated, including closing the list.
state.list.close(LIST_CHILD_PERMISSION)
// did any new children sneak in?
val waitChildAgain = lastChild.nextChild()
if (waitChildAgain != null && tryWaitForChild(state, waitChildAgain, proposedUpdate)) {
// yes, so now we have to wait for them!
// ideally, we should re-open the list,
// but it's too difficult with the current lock-free list implementation,
// so we'll live with non-linearizable behavior for now.
return
}
if (shouldWaitForChildren(state, proposedUpdate, suggestedStartSegment = lastSegment, suggestedStartIndex = lastIndexInSegment)) return // waiting for the next child
// no more children, now we are sure; try to update the state
val finalState = finalizeFinishingState(state, proposedUpdate)
afterCompletion(finalState)
}

private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
var cur = this
while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
while (true) {
cur = cur.nextNode
if (cur.isRemoved) continue
if (cur is ChildHandleNode) return cur
if (cur is NodeList) return null // checked all -- no more children
}
}

public final override val children: Sequence<Job> get() = sequence {
when (val state = this@JobSupport.state) {
is ChildHandleNode -> yield(state.childJob)
is Incomplete -> state.list?.let { list ->
list.forEach { if (it is ChildHandleNode) yield(it.childJob) }
list.forEach { it, _, _ -> if (it is ChildHandleNode) yield(it.childJob) }
}
}
}
Expand Down Expand Up @@ -1059,7 +1048,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* as this child didn't make it before [notifyCancelling] and won't be notified that it should be
* cancelled.
*
* And if the parent wasn't cancelled and the previous [LockFreeLinkedListNode.addLast] failed because
* And if the parent wasn't cancelled and the previous [LockFreeLinkedListHead.addLast] failed because
* the job is in its final state already, we won't be able to attach anyway, so we must just invoke
* the handler and return.
*/
Expand Down Expand Up @@ -1259,11 +1248,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private class ChildCompletion(
private val parent: JobSupport,
private val state: Finishing,
private val child: ChildHandleNode,
private val segment: LockFreeLinkedListSegment,
private val indexInSegment: Int,
private val proposedUpdate: Any?
) : JobNode() {
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
parent.continueCompleting(state, proposedUpdate, lastSegment = segment, lastIndexInSegment = indexInSegment)
}
override val onCancelling: Boolean get() = false
}
Expand Down Expand Up @@ -1410,9 +1400,9 @@ private val EMPTY_NEW = Empty(false)
private val EMPTY_ACTIVE = Empty(true)

// bit mask
private const val LIST_ON_COMPLETION_PERMISSION = 1
private const val LIST_CHILD_PERMISSION = 2
private const val LIST_CANCELLATION_PERMISSION = 4
private const val LIST_ON_COMPLETION_PERMISSION = 1.toByte()
private const val LIST_CHILD_PERMISSION = 2.toByte()
private const val LIST_CANCELLATION_PERMISSION = 4.toByte()

private class Empty(override val isActive: Boolean) : Incomplete {
override val list: NodeList? get() = null
Expand Down Expand Up @@ -1504,7 +1494,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete {
append(state)
append("}[")
var first = true
this@NodeList.forEach { node ->
this@NodeList.forEach { node, _, _ ->
if (node is JobNode) {
if (first) first = false else append(", ")
append(node)
Expand Down