Skip to content

Commit

Permalink
Add withLatestFrom operator.
Browse files Browse the repository at this point in the history
Co-authored-by: Ray Ryan <rjrjr@users.noreply.github.com>
  • Loading branch information
zach-klippenstein and rjrjr committed Jul 6, 2019
1 parent 9420df3 commit 0575808
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 5 deletions.
Expand Up @@ -884,6 +884,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun withLatestFrom (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

Expand Down
63 changes: 58 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
Expand Up @@ -10,6 +10,7 @@ package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
Expand Down Expand Up @@ -174,6 +175,57 @@ internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory:
}
}

/**
* Returns a [Flow] whose values are generated by the [transform] function every time this flow emits by combining the
* emitted value with the latest value emitted by `other`.
*
* Emissions are only triggered by this flow, not `other`, and the returned flow will remain active as long as this flow
* is, completing only when this flow completes. `other` will be cancelled if this flow completes first.
*
* The operator does not call [transform], and the returned flow will not emit, until `other` emits – if `other` does
* not emit, the returned flow will complete without emitting. However after `other` has emitted its first value, the
* returned flow will ignore `other`'s completion and continue to cache the last value it emitted.
*
* It can be demonstrated with the following example:
* ```
* val flow = flowOf(1, 2, 3).delayEach(10)
* val flow2 = flowOf("a", "b").delayEach(15)
* flow.withLatestFrom(flow2) { i, s -> i.toString() + s }.collect {
* println(it) // Will print "1a 2a 3b"
* }
* ```
*/
@ExperimentalCoroutinesApi
public fun <T1, T2, R> Flow<T1>.withLatestFrom(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
coroutineScope {
val firstChannel = asFairChannel(this@withLatestFrom)
firstChannel.consume {
var firstIsClosed = false

// This operator conflates values from the other Flow anyway, so the channel doesn't need any backpressure.
val secondChannel = asFairChannel(other, capacity = CONFLATED)
secondChannel.consume {
// Nothing can be emitted until the other Flow emits its first value, so don't enter the loop until
// that's happened.
var secondValue: Any = secondChannel.receiveOrNull() ?: return@coroutineScope
var secondIsClosed = false

while (!firstIsClosed) {
select<Unit> {
onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
emit(transform(NULL.unbox(value), NULL.unbox(secondValue)))
}

onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
secondValue = value
}
}
}
}
}
}
}

private inline fun SelectBuilder<Unit>.onReceive(
isClosed: Boolean,
channel: ReceiveChannel<Any>,
Expand All @@ -188,12 +240,13 @@ private inline fun SelectBuilder<Unit>.onReceive(
}

// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
val channel = channel as ChannelCoroutine<Any>
flow.collect { value ->
channel.sendFair(value ?: NULL)
private fun CoroutineScope.asFairChannel(flow: Flow<*>, capacity: Int = 0): ReceiveChannel<Any> =
produce(capacity = capacity) {
val channel = channel as ChannelCoroutine<Any>
flow.collect { value ->
channel.sendFair(value ?: NULL)
}
}
}


/**
Expand Down
@@ -0,0 +1,272 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*

/*
* Replace: { i, j -> i + j } -> ::sum as soon as KT-30991 is fixed
*/
class WithLatestFromTest : TestBase() {

@Test
fun testWithLatestFrom() = runTest {
val flow = flowOf("a", "b", "c")
val flow2 = flowOf(1, 2, 3)
val list = flow.withLatestFrom(flow2) { i, j -> i + j }.toList()
assertEquals(listOf("a1", "b2", "c3"), list)
}

@Test
fun testNulls() = runTest {
val flow = flowOf("a", null, null)
val flow2 = flowOf(1, 2, 3)
val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList()
assertEquals(listOf("a1", "null2", "null3"), list)
}

@Test
fun testNullsOther() = runTest {
val flow = flowOf("a", "b", "c")
val flow2 = flowOf(null, 2, null)
val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList()
assertEquals(listOf("anull", "b2", "cnull"), list)
}

@Test
fun testEmptyFlows() = runTest {
val flow = emptyFlow<String>().withLatestFrom(emptyFlow<Int>(), { i, j -> i + j })
assertNull(flow.singleOrNull())
}

@Test
fun testFirstIsEmpty() = runTest {
val f1 = emptyFlow<String>()
val f2 = flowOf(1)
assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList())
}

@Test
fun testSecondIsEmpty() = runTest {
val f1 = flowOf("a")
val f2 = emptyFlow<Int>()
assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList())
}

@Test
fun testPreservingOrder() = runTest {
val f1 = flow {
expect(1)
emit("a")
expect(3)
emit("b")
emit("c")
expect(5)
}

val f2 = flow {
expect(2)
emit(1)
yield()
yield()
expect(4)
emit(2)
expect(6)
yield()
expectUnreached()
}

val result = f1.withLatestFrom(f2) { i, j -> i + j }.toList()
assertEquals(listOf("a1", "b1", "c1"), result)
finish(7)
}

@Test
fun testPreservingOrderReversed() = runTest {
val f1 = flow {
expect(1)
emit("a")
expect(3)
emit("b")
emit("c")
expect(4)
}

val f2 = flow {
yield() // One more yield because now this flow starts first
expect(2)
emit(1)
yield()
yield()
expect(5)
emit(2)
expect(6)
yield()
expect(7)
emit(3)
}

val result = f2.withLatestFrom(f1) { i, j -> j + i }.toList()
assertEquals(listOf("a1", "c2", "c3"), result)
finish(8)
}

@Test
fun testContextIsIsolated() = runTest {
val f1 = flow {
emit("a")
assertEquals("first", NamedDispatchers.name())
expect(1)
}.flowOn(NamedDispatchers("first")).onEach {
assertEquals("nested", NamedDispatchers.name())
expect(2)
}.flowOn(NamedDispatchers("nested"))

val f2 = flow {
emit(1)
assertEquals("second", NamedDispatchers.name())
expect(3)
}.flowOn(NamedDispatchers("second"))
.onEach {
assertEquals("onEach", NamedDispatchers.name())
expect(4)
}.flowOn(NamedDispatchers("onEach"))

val value = withContext(NamedDispatchers("main")) {
f1.withLatestFrom(f2) { i, j ->
assertEquals("main", NamedDispatchers.name())
expect(5)
i + j
}.single()
}

assertEquals("a1", value)
finish(6)
}

@Test
fun testErrorInDownstreamCancelsUpstream() = runTest {
val f1 = flow {
emit("a")
hang {
expect(2)
}
}.flowOn(NamedDispatchers("first"))

val f2 = flow {
emit(1)
hang {
expect(3)
}
}.flowOn(NamedDispatchers("second"))

val flow = f1.withLatestFrom(f2) { i, j ->
assertEquals("combine", NamedDispatchers.name())
expect(1)
i + j
}.flowOn(NamedDispatchers("combine")).onEach {
throw TestException()
}

assertFailsWith<TestException>(flow)
finish(4)
}

@Test
fun testErrorCancelsSibling() = runTest {
val f1 = flow {
emit("a")
hang {
expect(1)
}
}.flowOn(NamedDispatchers("first"))

val f2 = flow {
emit(1)
throw TestException()
}.flowOn(NamedDispatchers("second"))

val flow = f1.withLatestFrom(f2) { _, _ -> 1 }
assertFailsWith<TestException>(flow)
finish(2)
}

@Test
fun testErrorCancelsSiblingReversed() = runTest {
val f1 = flow {
emit("a")
throw TestException()
}

val f2 = flow {
emit(1)
hang {
expect(1)
}
}

val flow = f1.withLatestFrom(f2) { _, _ -> 1 }
assertFailsWith<TestException>(flow)
finish(2)
}

@Test
fun testCancellationExceptionUpstream() = runTest {
val f1 = flow {
expect(1)
emit(1)
throw CancellationException("")
}
val f2 = flow {
emit(1)
hang { expect(3) }
}

val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) }
assertFailsWith<CancellationException>(flow)
finish(4)
}

@Test
fun testCancellationExceptionUpstreamReversed() = runTest {
val f1 = flow {
expect(1)
emit(1)
hang { expect(3) }
}
val f2 = flow {
emit(1)
throw CancellationException("")
}

val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) }
assertFailsWith<CancellationException>(flow)
finish(4)
}

@Test
fun testCancellationExceptionDownstream() = runTest {
val f1 = flow {
emit(1)
expect(3)
hang { expect(6) }
}
val f2 = flow {
emit(1)
expect(2)
hang { expect(5) }
}

val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach {
expect(1)
yield()
expect(4)
throw CancellationException("")
}
assertFailsWith<CancellationException>(flow)
finish(7)
}
}

0 comments on commit 0575808

Please sign in to comment.