Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CountDownLatch #2854

Merged
merged 7 commits into from
Dec 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public final class arrow/fx/coroutines/CircuitBreaker$State$Open : arrow/fx/coro
public fun toString ()Ljava/lang/String;
}

public final class arrow/fx/coroutines/CountDownLatch {
public fun <init> (J)V
public final fun await (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun count ()J
public final fun countDown ()V
}

public abstract class arrow/fx/coroutines/ExitCase {
public static final field Companion Larrow/fx/coroutines/ExitCase$Companion;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package arrow.fx.coroutines

import arrow.core.continuations.AtomicRef
import arrow.core.continuations.loop
import kotlinx.coroutines.CompletableDeferred

/**
* [CountDownLatch] allows for awaiting a given number of countdown signals.
* Models the behavior of java.util.concurrent.CountDownLatch in Kotlin with suspend.
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
*
* Must be initialised with an [initial] value of 1 or higher,
* if constructed with 0 or negative value then it throws [IllegalArgumentException].
*/
public class CountDownLatch @Throws(IllegalArgumentException::class) constructor(private val initial: Long) {
private val signal = CompletableDeferred<Unit>()
private val count = AtomicRef(initial)

init {
require(initial > 0) {
"CountDownLatch must be constructed with positive non-zero initial count $initial but was $initial > 0"
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
}
}

/** Remaining count */
public fun count(): Long = count.get()

/** Await [count] to reach zero */
public suspend fun await(): Unit = signal.await()

/** Decrement [count] by one */
@Suppress("ReturnCount")
public fun countDown() {
count.loop { current ->
when {
current == 0L -> return
current == 1L && count.compareAndSet(1L, 0L) -> return signal.complete(Unit).let {}
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
count.compareAndSet(current, current - 1) -> return
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package arrow.fx.coroutines

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.nulls.shouldBeNull
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeTypeOf
import io.kotest.property.Arb
import io.kotest.property.arbitrary.long
import io.kotest.property.checkAll
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull

class CountDownLatchSpec : StringSpec({
fun Arb.Companion.long(): Arb<Long> = Arb.long(1, 100)

"should raise an exception when constructed with a negative or zero capacity" {
checkAll(Arb.long(Long.MIN_VALUE, 0)) { i ->
shouldThrow<IllegalArgumentException> { CountDownLatch(i) }.message shouldBe
"CountDownLatch must be constructed with positive non-zero initial count $i but was $i > 0"
}
}

"release and then await should complete" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
repeat(count.toInt()) { latch.countDown() }
latch.await() shouldBe Unit
}
}

"await and then release should complete" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
val job = launch { latch.await() }
repeat(count.toInt()) { latch.countDown() }
job.join() shouldBe Unit
}
}

"await with > 1 latch unreleased should block" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
repeat(count.toInt() - 1) { latch.countDown() }
withTimeoutOrNull(1) { latch.await() }.shouldBeNull()
latch.count() shouldBe 1
}
}

"multiple awaits should all complete" {
checkAll(Arb.long()) { count ->
val latch = CountDownLatch(count)
val jobs = (0 until count).map { launch { latch.await() } }
repeat(count.toInt()) { latch.countDown() }
jobs.joinAll()
}
}

"should release when latches == 0" {
val latch = CountDownLatch(1)
latch.countDown()
latch.countDown()
}

"await is cancelable" {
val latch = CountDownLatch(1)
val exit = CompletableDeferred<ExitCase>()
val job = launch(start = CoroutineStart.UNDISPATCHED) {
guaranteeCase({ latch.await() }, exit::complete)
}
job.cancelAndJoin()
exit.isCompleted shouldBe true
exit.await().shouldBeTypeOf<ExitCase.Cancelled>()
}
})