Skip to content

Commit

Permalink
tadada
Browse files Browse the repository at this point in the history
  • Loading branch information
ndkoval committed Mar 23, 2021
1 parent e053e7c commit 8f90638
Showing 1 changed file with 39 additions and 39 deletions.
Expand Up @@ -59,11 +59,11 @@ import kotlin.native.concurrent.*
* and used by default. In this mode, [resume] fails if it finds the cell in the `CANCELLED` state or if the waiter resumption
* (see [CancellableContinuation.tryResume]) does not succeed. As we discussed, these failures are typically handled
* by re-starting the whole operation from the beginning. With the other two smart cancellation modes,
* [SMART_SYNC] and [SMART_ASYNC], [resume] skips `CANCELLED` cells (the cells where waiter resumption failed are also
* [SMART] and [SMART], [resume] skips `CANCELLED` cells (the cells where waiter resumption failed are also
* considered as `CANCELLED`). This way, even if a million of canceled continuations are stored in [SegmentQueueSynchronizer],
* one [resume] invocation is sufficient to pass the value to a waiter since it skips all these canceled waiters.
* However, these modes provide less intuitive contracts and require users to write more complicated code;
* the details are described further.
* the desuspendSegments are described further.
*
* The main issue with skipping `CANCELLED` cells in [resume] is that it can become illegal to put the value into
* the next cell. Consider the following execution: [suspend] is called, then [resume] starts, but the suspended
Expand All @@ -77,11 +77,11 @@ import kotlin.native.concurrent.*
* [tryReturnRefusedValue] to return it back to the outer data structure. However, it is possible for
* [tryReturnRefusedValue] to fail, and [returnValue] is called in this case. Typically, this [returnValue] function
* coincides with the one that resumes waiters (e.g., with [release][Semaphore.release] in [Semaphore]).
* The difference between [SMART_SYNC] and [SMART_ASYNC] modes is that in the [SMART_SYNC] mode, a [resume] that comes
* The difference between [SMART] and [SMART] modes is that in the [SMART] mode, a [resume] that comes
* to a cell with a canceled waiter waits in a spin-loop until the cancellation handler is invoked and the cell
* is moved to either `CANCELLED` or `REFUSE` state. In contrast, in the [SMART_ASYNC] mode, [resume] replaces
* is moved to either `CANCELLED` or `REFUSE` state. In contrast, in the [SMART] mode, [resume] replaces
* the canceled waiter with the value of this resumption and finishes immediately -- the cancellation handler
* completes this [resume] eventually. This way, in [SMART_ASYNC] mode, the value passed to [resume] can be out
* completes this [resume] eventually. This way, in [SMART] mode, the value passed to [resume] can be out
* of the data structure for a while but is guaranteed to be processed eventually.
*
* To support prompt cancellation, [SegmentQueueSynchronizer] returns the value back to the data structure by calling
Expand Down Expand Up @@ -135,7 +135,7 @@ import kotlin.native.concurrent.*
* between the counters for [resume] and [suspend], and does not need to store an infinite array of cells.
*
* To make the implementation efficient we maintain a linked list of [segments][SQSSegment], see the basic [Segment]
* class and the corresponding source file for details. In short, each segment has a unique id, and can be seen as
* class and the corresponding source file for desuspendSegments. In short, each segment has a unique id, and can be seen as
* a node in a Michael-Scott queue. Following this structure, we can maintain the cells that are in the current active
* range (between the counters), and access the cells similarly to an array. Specifically, we change the current working
* segment once every [SEGMENT_SIZE] operations, where [SEGMENT_SIZE] is the number of cells stored in each segment.
Expand Down Expand Up @@ -165,8 +165,8 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {

/**
* Specifies whether [resume] should fail on cancelled waiters ([SIMPLE]),
* or skip them in either [synchronous][SMART_SYNC] or [asynchronous][SMART_ASYNC]
* way. In the [asynchronous][SMART_ASYNC] mode, [resume] may pass the element to the
* or skip them in either [synchronous][SMART] or [asynchronous][SMART]
* way. In the [asynchronous][SMART] mode, [resume] may pass the element to the
* cancellation handler in order not to wait, so that the element can be "hung"
* for a while, but it is guaranteed that the element will be processed eventually.
*/
Expand Down Expand Up @@ -219,19 +219,19 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {

@Suppress("UNCHECKED_CAST")
internal fun suspend(cont: Continuation<T>): Boolean {
// Increment `enqIdx` and find the segment
// Increment `suspendIdx` and find the segment
// with the corresponding id. It is guaranteed
// that this segment is not removed since at
// least the cell for this [suspend] invocation
// is not in the `CANCELLED` state.
val curTail = this.suspendSegment.value
val enqIdx = suspendIdx.getAndIncrement()
val segment = this.suspendSegment.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
val curSuspendSegm = this.suspendSegment.value
val suspendIdx = suspendIdx.getAndIncrement()
val segment = this.suspendSegment.findSegmentAndMoveForward(id = suspendIdx / SEGMENT_SIZE, startFrom = curSuspendSegm,
createNewSegment = ::createSegment).segment
assert { segment.id == enqIdx / SEGMENT_SIZE }
assert { segment.id == suspendIdx / SEGMENT_SIZE }
// Try to install the continuation in the cell,
// this is the regular path.
val i = (enqIdx % SEGMENT_SIZE).toInt()
val i = (suspendIdx % SEGMENT_SIZE).toInt()
if (segment.cas(i, null, cont)) {
if (useBackoff) {
repeat(backoffSize) {
Expand Down Expand Up @@ -280,19 +280,19 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {

@Suppress("UNCHECKED_CAST")
internal fun suspendBlocking(waiter: Any): T? {
// Increment `enqIdx` and find the segment
// Increment `suspendIdx` and find the segment
// with the corresponding id. It is guaranteed
// that this segment is not removed since at
// least the cell for this [suspend] invocation
// is not in the `CANCELLED` state.
val curTail = this.suspendSegment.value
val enqIdx = suspendIdx.getAndIncrement()
val segment = this.suspendSegment.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
val curSuspendSegm = this.suspendSegment.value
val suspendIdx = suspendIdx.getAndIncrement()
val segment = this.suspendSegment.findSegmentAndMoveForward(id = suspendIdx / SEGMENT_SIZE, startFrom = curSuspendSegm,
createNewSegment = ::createSegment).segment
assert { segment.id == enqIdx / SEGMENT_SIZE }
assert { segment.id == suspendIdx / SEGMENT_SIZE }
// Try to install the continuation in the cell,
// this is the regular path.
val i = (enqIdx % SEGMENT_SIZE).toInt()
val i = (suspendIdx % SEGMENT_SIZE).toInt()
if (segment.cas(i, null, waiter)) {
if (useBackoff) {
repeat(backoffSize) {
Expand Down Expand Up @@ -345,7 +345,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
// Should we skip cancelled cells?
val skipCancelled = cancellationMode != SIMPLE
while (true) {
// Try to resume the next waiter, adjust [deqIdx] if
// Try to resume the next waiter, adjust [resumeIdx] if
// cancelled cells should be skipped anyway.
when (tryResumeImpl(value, adjustDeqIdx = skipCancelled)) {
TRY_RESUME_SUCCESS -> return true
Expand All @@ -361,7 +361,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
* or [TRY_RESUME_FAIL_BROKEN] if the next cell is marked as broken by
* this [tryResumeImpl] invocation due to the [SYNC] resumption mode.
*
* In the smart cancellation modes ([SMART_SYNC] and [SMART_ASYNC]) the
* In the smart cancellation modes ([SMART] and [SMART]) the
* cells marked as [cancelled][CANCELLED] should be skipped, so that
* there is no need to increment [resumeIdx] one-by-one if there is a
* removed segment (logically full of [cancelled][CANCELLED] cells);
Expand All @@ -374,29 +374,29 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
// Check that `adjustDeqIdx` is `false`
// in the simple cancellation mode.
assert { !(cancellationMode == SIMPLE && adjustDeqIdx) }
// Increment `deqIdx` and find the first segment with
// Increment `resumeIdx` and find the first segment with
// the corresponding or higher (if the required segment
// is physically removed) id.
val curHead = this.resumeSegment.value
val deqIdx = resumeIdx.getAndIncrement()
val id = deqIdx / SEGMENT_SIZE
val segment = this.resumeSegment.findSegmentAndMoveForward(id, startFrom = curHead,
val curResumeSegm = this.resumeSegment.value
val resumeIdx = resumeIdx.getAndIncrement()
val id = resumeIdx / SEGMENT_SIZE
val segment = this.resumeSegment.findSegmentAndMoveForward(id, startFrom = curResumeSegm,
createNewSegment = ::createSegment).segment
// The previous segments can be safely collected
// by GC, clean the pointer to them.
segment.cleanPrev()
// Is the required segment physically removed?
if (segment.id > id) {
// Adjust `deqIdx` to the first
// Adjust `resumeIdx` to the first
// non-removed segment if needed.
if (adjustDeqIdx) adjustDeqIdx(segment.id * SEGMENT_SIZE)
// The cell #deqIdx is in the `CANCELLED` state,
// The cell #resumeIdx is in the `CANCELLED` state,
// return the corresponding failure.
return TRY_RESUME_FAIL_CANCELLED
}
// Modify the cell according to the state machine,
// all the transitions are performed atomically.
val i = (deqIdx % SEGMENT_SIZE).toInt()
val i = (resumeIdx % SEGMENT_SIZE).toInt()
modify_cell@while (true) {
val cellState = segment.get(i)
when {
Expand Down Expand Up @@ -583,7 +583,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
// The cell should be considered as cancelled.
// Mark the cell correspondingly and help a
// concurrent `resume` to process its value if
// needed (see `SMART_ASYNC` cancellation mode).
// needed (see `SMART` cancellation mode).
val value = segment.markCancelled(index) ?: return
if (value === REFUSE) return
// Try to resume the next waiter with the value
Expand All @@ -598,7 +598,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
// cell should be refused by this `SegmentQueueSynchronizer`.
// Mark the cell correspondingly and help a concurrent
// `resume` to process its value if needed
// (see `SMART_ASYNC` cancellation mode).
// (see `SMART` cancellation mode).
val value = segment.markRefused(index) ?: return
returnRefusedValue(value as T)
}
Expand All @@ -622,7 +622,7 @@ internal abstract class SegmentQueueSynchronizer<T : Any> {
if (curIdx == (curSegment.id + 1) * SEGMENT_SIZE)
curSegment = curSegment.next ?: break
}
return "enqIdx=${suspendIdx.value},deqIdx=${resumeIdx.value},waiters=$waiters"
return "suspendIdx=${suspendIdx.value},resumeIdx=${resumeIdx.value},waiters=$waiters"
}
}

Expand Down Expand Up @@ -654,15 +654,15 @@ private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment<S
* Marks the cell as cancelled and returns `null`, so that the [resume]
* that comes to this cell detects that it is in the `CANCELLED` state
* and should fail or skip it depending on the cancellation mode.
* However, in [SMART_ASYNC] cancellation mode [resume] that comes to the cell
* However, in [SMART] cancellation mode [resume] that comes to the cell
* with cancelled continuation asynchronously puts its value into the cell,
* and the cancellation handler completes the resumption.
* In this case, [markCancelled] returns this non-null value.
*
* If the whole segment contains [CANCELLED] markers after
* this invocation, [onSlotCleaned] is invoked and this segment
* is going to be removed if [head][SegmentQueueSynchronizer.head]
* and [tail][SegmentQueueSynchronizer.tail] do not reference it.
* is going to be removed if [resumeSegment][SegmentQueueSynchronizer.resumeSegment]
* and [suspendSegment][SegmentQueueSynchronizer.suspendSegment] do not reference it.
* Note that the segments that are not stored physically are still
* considered as logically stored but being full of cancelled waiters.
*/
Expand Down Expand Up @@ -706,7 +706,7 @@ private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment<S
* that its value is refused by the [SegmentQueueSynchronizer],
* and [SegmentQueueSynchronizer.tryReturnRefusedValue]
* is invoked in this case (if it fails, the value is put back via
* [SegmentQueueSynchronizer.returnValue]). Since in [SMART_ASYNC]
* [SegmentQueueSynchronizer.returnValue]). Since in [SMART]
* cancellation mode [resume] that comes to the cell with cancelled
* continuation asynchronously puts its value into the cell.
* In this case, [markRefused] returns this non-null value.
Expand All @@ -716,7 +716,7 @@ private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment<S
/**
* Marks the cell with the specified [marker]
* and returns `null` if the cell contains the
* cancelled continuation. However, in the [SMART_ASYNC]
* cancelled continuation. However, in the [SMART]
* cancellation mode it is possible that [resume] comes
* to the cell with cancelled continuation and asynchronously
* puts its value into the cell, so that the cancellation
Expand Down Expand Up @@ -748,7 +748,7 @@ private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment<S
* this value should be used for resuming the next waiter or be refused. When this
* value is a continuation, it is hard to distinguish it with the one related to the cancelled
* waiter. Thus, such values are wrapped with [WrappedContinuationValue] in this case. Note that the
* wrapper is required only in [SegmentQueueSynchronizer.CancellationMode.SMART_ASYNC] mode
* wrapper is required only in [SegmentQueueSynchronizer.CancellationMode.SMART] mode
* and is used in the asynchronous race resolution logic between cancellation and [resume]
* invocation; this way, it is used relatively rare.
*/
Expand Down

0 comments on commit 8f90638

Please sign in to comment.