Skip to content

Commit

Permalink
Add CountDownLatch (#2854)
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed Dec 4, 2022
1 parent 3964b18 commit be63562
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 0 deletions.
7 changes: 7 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public final class arrow/fx/coroutines/CircuitBreaker$State$Open : arrow/fx/coro
public fun toString ()Ljava/lang/String;
}

public final class arrow/fx/coroutines/CountDownLatch {
public fun <init> (J)V
public final fun await (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun count ()J
public final fun countDown ()V
}

public abstract class arrow/fx/coroutines/ExitCase {
public static final field Companion Larrow/fx/coroutines/ExitCase$Companion;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package arrow.fx.coroutines

import arrow.core.continuations.AtomicRef
import arrow.core.continuations.loop
import kotlinx.coroutines.CompletableDeferred

/**
* [CountDownLatch] allows for awaiting a given number of countdown signals.
* Models the behavior of java.util.concurrent.CountDownLatch in Kotlin with `suspend`.
*
* Must be initialised with an [initial] value of 1 or higher,
* if constructed with 0 or negative value then it throws [IllegalArgumentException].
*/
public class CountDownLatch @Throws(IllegalArgumentException::class) constructor(private val initial: Long) {
private val signal = CompletableDeferred<Unit>()
private val count = AtomicRef(initial)

init {
require(initial > 0) {
"CountDownLatch must be constructed with positive non-zero initial count, but was $initial"
}
}

/** Remaining count */
public fun count(): Long = count.get()

/** Await [count] to reach zero */
public suspend fun await(): Unit = signal.await()

/** Decrement [count] by one */
@Suppress("ReturnCount")
public fun countDown() {
count.loop { current ->
when {
current == 0L -> return
current == 1L && count.compareAndSet(1L, 0L) -> {
signal.complete(Unit)
return
}
count.compareAndSet(current, current - 1) -> return
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package arrow.fx.coroutines

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.nulls.shouldBeNull
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeTypeOf
import io.kotest.property.Arb
import io.kotest.property.arbitrary.long
import io.kotest.property.checkAll
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull

class CountDownLatchSpec : StringSpec({
fun Arb.Companion.long(): Arb<Long> = Arb.long(1, 100)

"should raise an exception when constructed with a negative or zero capacity" {
checkAll(Arb.long(Long.MIN_VALUE, 0)) { i ->
shouldThrow<IllegalArgumentException> { CountDownLatch(i) }.message shouldBe
"CountDownLatch must be constructed with positive non-zero initial count, but was $i"
}
}

"release and then await should complete" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
repeat(count.toInt()) { latch.countDown() }
latch.await() shouldBe Unit
}
}

"await and then release should complete" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
val job = launch { latch.await() }
repeat(count.toInt()) { latch.countDown() }
job.join() shouldBe Unit
}
}

"await with > 1 latch unreleased should block" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
repeat(count.toInt() - 1) { latch.countDown() }
withTimeoutOrNull(1) { latch.await() }.shouldBeNull()
latch.count() shouldBe 1
}
}

"multiple awaits should all complete" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
val jobs = (0 until count).map { launch { latch.await() } }
repeat(count.toInt()) { latch.countDown() }
jobs.joinAll()
}
}

"should release when latches == 0" {
val latch = CountDownLatch(1)
latch.countDown()
latch.countDown()
}

"await is cancelable" {
val latch = CountDownLatch(1)
val exit = CompletableDeferred<ExitCase>()
val job = launch(start = CoroutineStart.UNDISPATCHED) {
guaranteeCase({ latch.await() }, exit::complete)
}
job.cancelAndJoin()
exit.isCompleted shouldBe true
exit.await().shouldBeTypeOf<ExitCase.Cancelled>()
}
})

0 comments on commit be63562

Please sign in to comment.