Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add withLatestFrom operator. #1315

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -884,6 +884,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun withLatestFrom (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

Expand Down
63 changes: 58 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
Expand Up @@ -10,6 +10,7 @@ package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
Expand Down Expand Up @@ -174,6 +175,57 @@ internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory:
}
}

/**
* Returns a [Flow] whose values are generated by the [transform] function every time this flow emits by combining the
* emitted value with the latest value emitted by `other`.
*
* Emissions are only triggered by this flow, not `other`, and the returned flow will remain active as long as this flow
* is, completing only when this flow completes. `other` will be cancelled if this flow completes first.
*
* The operator does not call [transform], and the returned flow will not emit, until `other` emits – if `other` does
* not emit, the returned flow will complete without emitting. However after `other` has emitted its first value, the
* returned flow will ignore `other`'s completion and continue to cache the last value it emitted.
*
* It can be demonstrated with the following example:
* ```
* val flow = flowOf(1, 2, 3).delayEach(10)
* val flow2 = flowOf("a", "b").delayEach(15)
* flow.withLatestFrom(flow2) { i, s -> i.toString() + s }.collect {
* println(it) // Will print "1a 2a 3b"
* }
* ```
*/
@ExperimentalCoroutinesApi
public fun <T1, T2, R> Flow<T1>.withLatestFrom(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
coroutineScope {
val firstChannel = asFairChannel(this@withLatestFrom)
firstChannel.consume {
var firstIsClosed = false

// This operator conflates values from the other Flow anyway, so the channel doesn't need any backpressure.
val secondChannel = asFairChannel(other, capacity = CONFLATED)
secondChannel.consume {
// Nothing can be emitted until the other Flow emits its first value, so don't enter the loop until
// that's happened.
var secondValue: Any = secondChannel.receiveOrNull() ?: return@coroutineScope
var secondIsClosed = false

while (!firstIsClosed) {
select<Unit> {
onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
emit(transform(NULL.unbox(value), NULL.unbox(secondValue)))
}

onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
secondValue = value
}
}
}
}
}
}
}

private inline fun SelectBuilder<Unit>.onReceive(
isClosed: Boolean,
channel: ReceiveChannel<Any>,
Expand All @@ -188,12 +240,13 @@ private inline fun SelectBuilder<Unit>.onReceive(
}

// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
val channel = channel as ChannelCoroutine<Any>
flow.collect { value ->
channel.sendFair(value ?: NULL)
private fun CoroutineScope.asFairChannel(flow: Flow<*>, capacity: Int = 0): ReceiveChannel<Any> =
produce(capacity = capacity) {
val channel = channel as ChannelCoroutine<Any>
flow.collect { value ->
channel.sendFair(value ?: NULL)
}
}
}


/**
Expand Down
@@ -0,0 +1,272 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*

/*
* Replace: { i, j -> i + j } -> ::sum as soon as KT-30991 is fixed
*/
class WithLatestFromTest : TestBase() {

@Test
fun testWithLatestFrom() = runTest {
val flow = flowOf("a", "b", "c")
val flow2 = flowOf(1, 2, 3)
val list = flow.withLatestFrom(flow2) { i, j -> i + j }.toList()
assertEquals(listOf("a1", "b2", "c3"), list)
}

@Test
fun testNulls() = runTest {
val flow = flowOf("a", null, null)
val flow2 = flowOf(1, 2, 3)
val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList()
assertEquals(listOf("a1", "null2", "null3"), list)
}

@Test
fun testNullsOther() = runTest {
val flow = flowOf("a", "b", "c")
val flow2 = flowOf(null, 2, null)
val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList()
assertEquals(listOf("anull", "b2", "cnull"), list)
}

@Test
fun testEmptyFlows() = runTest {
val flow = emptyFlow<String>().withLatestFrom(emptyFlow<Int>(), { i, j -> i + j })
assertNull(flow.singleOrNull())
}

@Test
fun testFirstIsEmpty() = runTest {
val f1 = emptyFlow<String>()
val f2 = flowOf(1)
assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList())
}

@Test
fun testSecondIsEmpty() = runTest {
val f1 = flowOf("a")
val f2 = emptyFlow<Int>()
assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList())
}

@Test
fun testPreservingOrder() = runTest {
val f1 = flow {
expect(1)
emit("a")
expect(3)
emit("b")
emit("c")
expect(5)
}

val f2 = flow {
expect(2)
emit(1)
yield()
yield()
expect(4)
emit(2)
expect(6)
yield()
expectUnreached()
}

val result = f1.withLatestFrom(f2) { i, j -> i + j }.toList()
assertEquals(listOf("a1", "b1", "c1"), result)
finish(7)
}

@Test
fun testPreservingOrderReversed() = runTest {
val f1 = flow {
expect(1)
emit("a")
expect(3)
emit("b")
emit("c")
expect(4)
}

val f2 = flow {
yield() // One more yield because now this flow starts first
expect(2)
emit(1)
yield()
yield()
expect(5)
emit(2)
expect(6)
yield()
expect(7)
emit(3)
}

val result = f2.withLatestFrom(f1) { i, j -> j + i }.toList()
assertEquals(listOf("a1", "c2", "c3"), result)
finish(8)
}

@Test
fun testContextIsIsolated() = runTest {
val f1 = flow {
emit("a")
assertEquals("first", NamedDispatchers.name())
expect(1)
}.flowOn(NamedDispatchers("first")).onEach {
assertEquals("nested", NamedDispatchers.name())
expect(2)
}.flowOn(NamedDispatchers("nested"))

val f2 = flow {
emit(1)
assertEquals("second", NamedDispatchers.name())
expect(3)
}.flowOn(NamedDispatchers("second"))
.onEach {
assertEquals("onEach", NamedDispatchers.name())
expect(4)
}.flowOn(NamedDispatchers("onEach"))

val value = withContext(NamedDispatchers("main")) {
f1.withLatestFrom(f2) { i, j ->
assertEquals("main", NamedDispatchers.name())
expect(5)
i + j
}.single()
}

assertEquals("a1", value)
finish(6)
}

@Test
fun testErrorInDownstreamCancelsUpstream() = runTest {
val f1 = flow {
emit("a")
hang {
expect(2)
}
}.flowOn(NamedDispatchers("first"))

val f2 = flow {
emit(1)
hang {
expect(3)
}
}.flowOn(NamedDispatchers("second"))

val flow = f1.withLatestFrom(f2) { i, j ->
assertEquals("combine", NamedDispatchers.name())
expect(1)
i + j
}.flowOn(NamedDispatchers("combine")).onEach {
throw TestException()
}

assertFailsWith<TestException>(flow)
finish(4)
}

@Test
fun testErrorCancelsSibling() = runTest {
val f1 = flow {
emit("a")
hang {
expect(1)
}
}.flowOn(NamedDispatchers("first"))

val f2 = flow {
emit(1)
throw TestException()
}.flowOn(NamedDispatchers("second"))

val flow = f1.withLatestFrom(f2) { _, _ -> 1 }
assertFailsWith<TestException>(flow)
finish(2)
}

@Test
fun testErrorCancelsSiblingReversed() = runTest {
val f1 = flow {
emit("a")
throw TestException()
}

val f2 = flow {
emit(1)
hang {
expect(1)
}
}

val flow = f1.withLatestFrom(f2) { _, _ -> 1 }
assertFailsWith<TestException>(flow)
finish(2)
}

@Test
fun testCancellationExceptionUpstream() = runTest {
val f1 = flow {
expect(1)
emit(1)
throw CancellationException("")
}
val f2 = flow {
emit(1)
hang { expect(3) }
}

val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) }
assertFailsWith<CancellationException>(flow)
finish(4)
}

@Test
fun testCancellationExceptionUpstreamReversed() = runTest {
val f1 = flow {
expect(1)
emit(1)
hang { expect(3) }
}
val f2 = flow {
emit(1)
throw CancellationException("")
}

val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) }
assertFailsWith<CancellationException>(flow)
finish(4)
}

@Test
fun testCancellationExceptionDownstream() = runTest {
val f1 = flow {
emit(1)
expect(3)
hang { expect(6) }
}
val f2 = flow {
emit(1)
expect(2)
hang { expect(5) }
}

val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach {
expect(1)
yield()
expect(4)
throw CancellationException("")
}
assertFailsWith<CancellationException>(flow)
finish(7)
}
}