Skip to content

Commit

Permalink
Update channel tests (#317)
Browse files Browse the repository at this point in the history
Do not use internal functions to create channels.
  • Loading branch information
JakeWharton committed Apr 11, 2024
1 parent d29b357 commit 87dea67
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 85 deletions.
2 changes: 1 addition & 1 deletion src/commonMain/kotlin/app/cash/turbine/flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope, timeout: Duratio
}
}

internal fun <T> Flow<T>.collectIntoChannel(scope: CoroutineScope): Channel<T> {
private fun <T> Flow<T>.collectIntoChannel(scope: CoroutineScope): Channel<T> {
val output = Channel<T>(UNLIMITED)
val job = scope.launch(start = UNDISPATCHED) {
try {
Expand Down
126 changes: 44 additions & 82 deletions src/commonTest/kotlin/app/cash/turbine/ChannelTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertSame
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
Expand All @@ -41,13 +36,7 @@ class ChannelTest {
val expected = CustomThrowable("hello")

val actual = assertFailsWith<CustomThrowable> {
val channel = flow {
emit(1)
emit(2)
emit(3)
throw expected
}.collectIntoChannel(this)

val channel = channelOf(1, 2, 3, closeCause = expected)
channel.expectMostRecentItem()
}
assertSame(expected, actual)
Expand All @@ -56,178 +45,154 @@ class ChannelTest {
@Test
fun expectMostRecentItemButNoItemWasFoundThrows() = runTest {
val actual = assertFailsWith<AssertionError> {
val channel = emptyFlow<Any>().collectIntoChannel(this)
val channel = channelOf<Nothing>()
channel.expectMostRecentItem()
}
assertEquals("No item was found", actual.message)
}

@Test
fun expectMostRecentItem() = runTest {
val onTwoSent = CompletableDeferred<Unit>()
val onTwoContinue = CompletableDeferred<Unit>()
val onCompleteSent = CompletableDeferred<Unit>()
val onCompleteContinue = CompletableDeferred<Unit>()

val channel = flowOf(1, 2, 3, 4, 5)
.map {
if (it == 3) {
onTwoSent.complete(Unit)
onTwoContinue.await()
}
it
}
.onCompletion {
onCompleteSent.complete(Unit)
onCompleteContinue.await()
}
.collectIntoChannel(this)
val channel = Channel<Int>(UNLIMITED)
channel.trySend(1)
channel.trySend(2)

onTwoSent.await()
assertEquals(2, channel.expectMostRecentItem())
onTwoContinue.complete(Unit)

onCompleteSent.await()
channel.trySend(3)
channel.trySend(4)
channel.trySend(5)
assertEquals(5, channel.expectMostRecentItem())
onCompleteContinue.complete(Unit)

channel.cancel()
}

@Test
fun assertNullValuesWithExpectMostRecentItem() = runTest {
val channel = flowOf(1, 2, null).collectIntoChannel(this)
val channel = channelOf(1, 2, null)

assertEquals(null, channel.expectMostRecentItem())
}

@Test fun awaitItemsAreSkipped() = runTest {
val channel = flowOf(1, 2, 3).collectIntoChannel(this)
val channel = channelOf(1, 2, 3)
channel.skipItems(2)
assertEquals(3, channel.awaitItem())
}

@Test fun skipItemsThrowsOnComplete() = runTest {
val channel = flowOf(1, 2).collectIntoChannel(this)
val channel = channelOf(1, 2)
val message = assertFailsWith<AssertionError> {
channel.skipItems(3)
}.message
assertEquals("Expected 3 items but got 2 items and Complete", message)
}

@Test fun expectErrorOnCompletionBeforeAllItemsWereSkipped() = runTest {
val channel = flowOf(1).collectIntoChannel(this)
val channel = channelOf(1)
assertFailsWith<AssertionError> {
channel.skipItems(2)
}
}

@Test fun expectErrorOnErrorReceivedBeforeAllItemsWereSkipped() = runTest {
val error = CustomThrowable("hello")
val channel = flow {
emit(1)
throw error
}.collectIntoChannel(this)
val channel = channelOf(1, closeCause = error)
val actual = assertFailsWith<AssertionError> {
channel.skipItems(2)
}
assertSame(error, actual.cause)
}

@Test fun expectNoEvents() = runTest {
val channel = neverFlow().collectIntoChannel(this)
val channel = neverChannel()
channel.expectNoEvents()
channel.cancel()
}

@Test fun awaitItemEvent() = runTest {
val item = Any()
val channel = flowOf(item).collectIntoChannel(this)
val channel = channelOf(item)
val event = channel.awaitEvent()
assertEquals(Event.Item(item), event)
}

@Test fun expectCompleteEvent() = runTest {
val channel = emptyFlow<Nothing>().collectIntoChannel(this)
val channel = emptyChannel()
val event = channel.awaitEvent()
assertEquals(Event.Complete, event)
}

@Test fun expectErrorEvent() = runTest {
val exception = CustomThrowable("hello")
val channel = flow<Nothing> { throw exception }.collectIntoChannel(this)
val channel = channelOf<Nothing>(closeCause = exception)
val event = channel.awaitEvent()
assertEquals(Event.Error(exception), event)
}

@Test fun awaitItem() = runTest {
val item = Any()
val channel = flowOf(item).collectIntoChannel(this)
val channel = channelOf(item)
assertSame(item, channel.awaitItem())
}

@Test fun awaitItemButWasCloseThrows() = runTest {
val actual = assertFailsWith<AssertionError> {
emptyFlow<Unit>().collectIntoChannel(this).awaitItem()
emptyChannel().awaitItem()
}
assertEquals("Expected item but found Complete", actual.message)
}

@Test fun awaitItemButWasErrorThrows() = runTest {
val error = CustomThrowable("hello")
val actual = assertFailsWith<AssertionError> {
flow<Unit> { throw error }.collectIntoChannel(this)
.awaitItem()
channelOf<Nothing>(closeCause = error).awaitItem()
}
assertEquals("Expected item but found Error(CustomThrowable)", actual.message)
assertSame(error, actual.cause)
}

@Test fun awaitComplete() = runTest {
emptyFlow<Nothing>().collectIntoChannel(this).awaitComplete()
emptyChannel().awaitComplete()
}

@Test fun awaitCompleteButWasItemThrows() = runTest {
val actual = assertFailsWith<AssertionError> {
flowOf("item!").collectIntoChannel(this)
.awaitComplete()
channelOf("item!").awaitComplete()
}
assertEquals("Expected complete but found Item(item!)", actual.message)
}

@Test fun awaitCompleteButWasErrorThrows() = runTest {
val error = CustomThrowable("hello")
val actual = assertFailsWith<AssertionError> {
flow<Unit> { throw RuntimeException() }.collectIntoChannel(this)
.awaitComplete()
channelOf<Nothing>(closeCause = error).awaitComplete()
}
assertEquals("Expected complete but found Error(RuntimeException)", actual.message)
assertEquals("Expected complete but found Error(CustomThrowable)", actual.message)
assertSame(error, actual.cause)
}

@Test fun awaitError() = runTest {
val error = CustomThrowable("hello")
val channel = flow<Nothing> { throw error }.collectIntoChannel(this)
val channel = channelOf<Nothing>(closeCause = error)
assertSame(error, channel.awaitError())
}

@Test fun awaitErrorButWasItemThrows() = runTest {
val actual = assertFailsWith<AssertionError> {
flowOf("item!").collectIntoChannel(this).awaitError()
channelOf("item!").awaitError()
}
assertEquals("Expected error but found Item(item!)", actual.message)
}

@Test fun awaitErrorButWasCompleteThrows() = runTest {
val actual = assertFailsWith<AssertionError> {
emptyFlow<Nothing>().collectIntoChannel(this).awaitError()
emptyChannel().awaitError()
}
assertEquals("Expected error but found Complete", actual.message)
}

@Test fun failsOnDefaultTimeout() = runTest {
val actual = assertFailsWith<AssertionError> {
coroutineScope {
neverFlow().collectIntoChannel(this).awaitItem()
}
neverChannel().awaitItem()
}
assertEquals("No value produced in 3s", actual.message)
assertCallSitePresentInStackTraceOnJvm(
Expand All @@ -240,7 +205,7 @@ class ChannelTest {
@Test fun awaitHonorsCoroutineContextTimeoutNoTimeout() = runTest {
withTurbineTimeout(1500.milliseconds) {
val job = launch {
neverFlow().collectIntoChannel(this).awaitItem()
neverChannel().awaitItem()
}

withContext(Dispatchers.Default) {
Expand All @@ -253,7 +218,7 @@ class ChannelTest {
@Test fun awaitHonorsCoroutineContextTimeoutTimeout() = runTest {
val actual = assertFailsWith<AssertionError> {
withTurbineTimeout(10.milliseconds) {
neverFlow().collectIntoChannel(this).awaitItem()
neverChannel().awaitItem()
}
}
assertEquals("No value produced in 10ms", actual.message)
Expand All @@ -277,22 +242,21 @@ class ChannelTest {

@Test fun takeItem() = withTestScope {
val item = Any()
val channel = flowOf(item).collectIntoChannel(this)
val channel = channelOf(item)
assertSame(item, channel.takeItem())
}

@Test fun takeItemButWasCloseThrows() = withTestScope {
val actual = assertFailsWith<AssertionError> {
emptyFlow<Unit>().collectIntoChannel(this).takeItem()
emptyChannel().takeItem()
}
assertEquals("Expected item but found Complete", actual.message)
}

@Test fun takeItemButWasErrorThrows() = withTestScope {
val error = CustomThrowable("hello")
val actual = assertFailsWith<AssertionError> {
flow<Unit> { throw error }.collectIntoChannel(this)
.takeItem()
channelOf<Nothing>(closeCause = error).takeItem()
}
assertEquals("Expected item but found Error(CustomThrowable)", actual.message)
assertSame(error, actual.cause)
Expand All @@ -301,52 +265,50 @@ class ChannelTest {
@Test
fun expectMostRecentItemButNoItemWasFoundThrowsWithName() = runTest {
val actual = assertFailsWith<AssertionError> {
val channel = emptyFlow<Any>().collectIntoChannel(this)
channel.expectMostRecentItem(name = "empty flow")
emptyChannel().expectMostRecentItem(name = "empty flow")
}
assertEquals("No item was found for empty flow", actual.message)
}

@Test fun awaitItemButWasCloseThrowsWithName() = runTest {
val actual = assertFailsWith<AssertionError> {
emptyFlow<Unit>().collectIntoChannel(this).awaitItem(name = "closed flow")
emptyChannel().awaitItem(name = "closed flow")
}
assertEquals("Expected item for closed flow but found Complete", actual.message)
}

@Test fun awaitCompleteButWasItemThrowsWithName() = runTest {
val actual = assertFailsWith<AssertionError> {
flowOf("item!").collectIntoChannel(this)
.awaitComplete(name = "item flow")
channelOf("item!").awaitComplete(name = "item flow")
}
assertEquals("Expected complete for item flow but found Item(item!)", actual.message)
}

@Test fun awaitErrorButWasItemThrowsWithName() = runTest {
val actual = assertFailsWith<AssertionError> {
flowOf("item!").collectIntoChannel(this).awaitError(name = "item flow")
channelOf("item!").awaitError(name = "item flow")
}
assertEquals("Expected error for item flow but found Item(item!)", actual.message)
}

@Test fun awaitHonorsCoroutineContextTimeoutTimeoutWithName() = runTest {
val actual = assertFailsWith<AssertionError> {
withTurbineTimeout(10.milliseconds) {
neverFlow().collectIntoChannel(this).awaitItem(name = "never flow")
neverChannel().awaitItem(name = "never flow")
}
}
assertEquals("No value produced for never flow in 10ms", actual.message)
}

@Test fun takeItemButWasCloseThrowsWithName() = withTestScope {
val actual = assertFailsWith<AssertionError> {
emptyFlow<Unit>().collectIntoChannel(this).takeItem(name = "empty flow")
emptyChannel().takeItem(name = "empty flow")
}
assertEquals("Expected item for empty flow but found Complete", actual.message)
}

@Test fun skipItemsThrowsOnCompleteWithName() = runTest {
val channel = flowOf(1, 2).collectIntoChannel(this)
val channel = channelOf(1, 2)
val message = assertFailsWith<AssertionError> {
channel.skipItems(3, name = "two item channel")
}.message
Expand Down
15 changes: 15 additions & 0 deletions src/commonTest/kotlin/app/cash/turbine/testUtil.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package app.cash.turbine

import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

Expand All @@ -33,3 +36,15 @@ expect fun assertCallSitePresentInStackTraceOnJvm(
entryPoint: String,
callSite: String,
)

fun <T> channelOf(vararg items: T, closeCause: Throwable? = null): ReceiveChannel<T> {
return Channel<T>(UNLIMITED).also { channel ->
for (item in items) {
channel.trySend(item).getOrThrow()
}
channel.close(closeCause)
}
}

fun emptyChannel(): ReceiveChannel<Nothing> = channelOf()
fun neverChannel(): ReceiveChannel<Nothing> = Channel()
3 changes: 1 addition & 2 deletions src/jvmTest/kotlin/app/cash/turbine/ChannelJvmTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package app.cash.turbine

import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.test.runTest
import org.junit.Test

class ChannelJvmTest {
@Test
fun takeItemSuspendingThrows() = runTest {
val actual = assertFailsWith<IllegalStateException> {
emptyFlow<Unit>().collectIntoChannel(this).takeItem()
emptyChannel().takeItem()
}
assertEquals("Calling context is suspending; use a suspending method instead", actual.message)
}
Expand Down

0 comments on commit 87dea67

Please sign in to comment.