Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow: withLatestFrom operator #1498

Open
elizarov opened this issue Sep 3, 2019 · 23 comments
Open

Flow: withLatestFrom operator #1498

elizarov opened this issue Sep 3, 2019 · 23 comments
Labels

Comments

@elizarov
Copy link
Contributor

elizarov commented Sep 3, 2019

See #1315 for one of the use-cases. Note, that is operator can be implemented in a quite a straightforward way using stable Flow APIs. The simple implementation is given below:

fun <A, B: Any, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> = flow {
    coroutineScope {
        val latestB = AtomicReference<B?>()
        val outerScope = this
        launch {
            try {
                other.collect { latestB.set(it) }
            } catch(e: CancellationException) {
                outerScope.cancel(e) // cancel outer scope on cancellation exception, too
            }
        }
        collect { a: A ->
            latestB.get()?.let { b -> emit(transform(a, b)) }
        }
    }
}

TODO: We need to figure out a name for this operation that is consistent with combine and xxxLatest naming convention. Even better, it should be a part of the customizable "combine family" of operations where each source flow can either react to the the incoming values or just reference the most recent value. It is also related to #1354 and #1082 which would introduce the concept of StateFlow (a flow with the most recent value).

@elizarov elizarov added the flow label Sep 3, 2019
@ivanbartsov
Copy link

@elizarov Is this the draft for the real implementation, or is PR #1315 going to be the real implementation and this is just equivalent for easier reading? Afaiu, there's a requirement in #1315 that withLatestFrom should continue emitting latestB in case B completes before A does, the implementation above seems to cancel the whole thing when B gets cancelled.

Also, a question for my own education, if it's not too much trouble:

  • what's the reason behind catching the cancellation exception and explicitly passing it to .cancel() of the outerScope -- wouldn't it work exactly the same way without the try/catch block because of structured concurrency? (with job introduced by launch being child to job of the coroutineScope block and hence propagating both exceptions and cancellation) Sorry if I missed something, I'm fairly new to kotlinx.coroutines

@elizarov
Copy link
Contributor Author

elizarov commented Sep 6, 2019

@ivanbartsov This implementation given above completes only when this (a) flow completes. Completion of the other (b) flow does not terminate it.

The reason for try/catch is the CancellationException has a special role in the structured concurrency. coroutineScope { } does not complete when one of its children crash with CancellationException, because CancellationException is used as a means to explicitly cancel specific children without touching others.

@ivanbartsov
Copy link

@elizarov

coroutineScope { } does not complete when one of its children crash with CancellationException

Right, that slipped my mind. Everything falls into place, thanks :)

@zach-klippenstein
Copy link
Contributor

I think there's a slight difference in behavior with this implementation vs the one in #1315, which is that the latter will still start collecting both flows eagerly but it will apply backpressure to this until other emits its first item. This implementation will drop items from this if other doesn't emit right away.

@recoverrelax
Copy link

correct me if i'm wrong. This implementation would not support null in the second flow right? Because the default for the atomicReference is null

@recoverrelax
Copy link

recoverrelax commented Dec 18, 2019

I had need for this operator, so i just used your code, but then realised null not allowed for the second flow. I came up with this fix:


private object UNINITIALIZED

fun <A, B, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> =
    flow {
        coroutineScope {
            val latestB = AtomicReference<Any>(UNINITIALIZED)
            val outerScope = this

            launch {
                try {
                    other.collect { latestB.set(it) }
                } catch (e: CancellationException) {
                    outerScope.cancel(e) // cancel outer scope on cancellation exception, too
                }
            }

            collect { a: A ->
                latestB.get().let {
                    if (it != UNINITIALIZED) {
                        emit(transform(a, it as B))
                    }
                }
            }
        }
    }

I think it will work and support for both flows to be null. Am i right?

@juliocbcotta
Copy link

Is there a multi-platform version of this? It would be really useful.

@zach-klippenstein
Copy link
Contributor

All Flow operators are multi-platform by default.

@chris-hatton
Copy link

@BugsBunnyBR ...that moment when your dreams come true. 👍

@juliocbcotta
Copy link

I meant to ask for a version of this operator that is not bound to JVM.
As far as I could see, all proposals in this thread uses AtomicReference and it is a JVM class.

@uburoiubu
Copy link

uburoiubu commented Apr 9, 2020

I find this operator extremely useful for UI reactive programming.

I have a document, which is simply a list of blocks representing different text fields in which user can type something. This document's state is composed from different data streams, including meta-data about this document along with ui-events like a specific block focusing. Only when document's structure is changed (i.e. new blocks added or deleted, etc.), it should be re-rendered.

Given these steams:

val document : Flow<List<Block>>
val focus: Flow<Id>
val metadata: Flow<Metadata>
document.withLatestFrom(
            focus, 
            metadata
        ) { doc, focus, metadata ->
            doc.mapToView(focus, metadata)
        }.collect { render(it) }

I don't need render(it) to be called every time focus or metadata changed (that's why current implementations of combineLatest() or even combine() do not work for this use case). Metadata and focus live on their own and can be changed internally without affecting what is being rendered. It is only when it is time to (re)render the document that we need these latests values from focus and metadata.

@elizarov
Copy link
Contributor Author

It is only when it is time to (re)render the document that we need these latests values from focus and metadata.

@uburoiubu Can you, please, explain it in a little bit more detail. I really don't get how it is a case when doc.mapToView needs both focus and metadata parameters (meaning it somehow uses them, so its result should surely depend on them), but you don't need to render when they change? If you don't need to render on their change why would you even need them then?

@Lamartio
Copy link

Lamartio commented Jun 26, 2020

@BugsBunnyBR In order to make this KMP, you can implement the AtomicFU library (https://github.com/Kotlin/kotlinx.atomicfu) or just use the ConflatedBroadcastChannel instead of the AtomicReference. The ConflatedBroadcastChannel uses AtomicFU in its internals.

For example:


private object Unitialized

fun <A, B, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> =
    flow {
        coroutineScope {
            val outerScope = this
            val channel = ConflatedBroadcastChannel<Any?>(Unitialized)

            launch {
                try {
                    other.collect(channel::send)
                } catch (e: CancellationException) {
                    outerScope.cancel(e)
                }
            }

            collect { a ->
                channel.asFlow().first().let { b ->
                    if (b != Unitialized)
                        emit(transform(a, b as B))
                }
            }
        }
    }

@Progdrasil
Copy link

Seeing as #1315 seems to have stalled since 2019. Seeing as StateFlow has been released, is there anything blocking the implementation of withLatestFrom? similarly to @uburoiubu I too find this operator very useful for reactive UI programming and would gladly help on its implementation if needed.

@elizarov
Copy link
Contributor Author

@Progdrasil It would be extremely helpful if you drop a link here with some examples of your code that uses withLatestFrom (the more code -- the better).

@Progdrasil
Copy link

if you drop a link here with some examples of your code

Unfortunately the code that uses withLatestFrom is closed source for now. But I can put a few examples inline here. Basically we use it on Android when dealing with forms where we want the data from the form on specific events, for example a button tap.
These examples are using FlowBinding.

val editTextList: List<TextInputEditText>
val textFlows: List<Flow<String>> = editTextList.map { it.textChanges() }

val submit: Button
submit
    .clicks()
    .withLatestFrom(textFlows) { _, data -> validate(data) }
    .launchIn(lifecycleScope)

Note its simplified for the actual validation and how its shown back to the user.

Else we also have the following:

val editTextList: List<TextInputEditText>
val validatedFlow: Flow<SomeDataClass> = editTextList
    .map { it.textChanges() }
    .combine { data -> parse(data) }

val submit: Button
submit
    .clicks()
    .withLatestFrom(validatedFlow) { _, data -> applyBusinessLogic(data) }
    .launchIn(lifecycleScope)

@elizarov
Copy link
Contributor Author

@Progdrasil Thanks a lot! What's the value of representing text flows and validatedFlow in this code as a Flow type? It does not seem that this code uses the reactive nature of those types in any way but simply takes their current value using withLatestFrom operator.

May I suggest you to consider an alternative approach to structure your code that seems simpler to me. Instead of val textFlows: List<Flow<String>> you can have:

val texts: List<String> 
    get() = editTextList.map { it.text } // retrieve current text when called

It is not using Flow here, since you only use the current text anyway. Then you can write:

submit
    .clicks()
    .map { applyBusinessLogic(texts) }
    .launchIn(lifecycleScope)

You can do a similar simplification for the other snippet. Relace val validatedFlow: Flow<SomeDataClass> with:

val validated: SomeDataClass
   get() = parse(textData) // just parse the current text data when called, no flow

Then you will be able to write simpler code there, too:

submit
    .clicks()
    .map { applyBusinessLogic(validated) }
    .launchIn(lifecycleScope)

Does it help?

@Progdrasil
Copy link

Without getting into too much detail, The reason we have a list of flows is actually because its the end result of what we end up with after creating a bunch of Input fields in a recycler view. So what we end up with is actually a Map<FieldIDOrPosition, MutableSharedFlow<Flow<String>>> Which ends up as a list of flows before the withLatestFrom part.

@chris-hatton
Copy link

chris-hatton commented Jul 18, 2021

It's 2021 and I'm still coming back to this thread to get the withLatestFrom operator source for my new projects!
I don't know how everyone else is doing it, but to me this operator is an absolute staple when processing UI events i.e. when you need to 'snapshot' the latest value from a Flow upon user Clicks/Taps etc. Feels like this should be part of the coroutines library, as it is with RxJava, unless I'm missing some more idiomatic way to achieve this.

@elizarov
Copy link
Contributor Author

It's 2021 and I'm still coming back to this thread to get the withLatestFrom operator source for my new projects!
I don't know how everyone else is doing it, but to me this operator is an absolute staple when processing UI events i.e. when you need to 'snapshot' the latest value from a Flow upon user Clicks/Taps etc. Feels like this should be part of the coroutines library, as it is with RxJava, unless I'm missing some more idiomatic way to achieve this.

@chris-hatton If you have something in your app that you take a snapshot of, then it might benefit from being modelled as a StateFlow. With this approach, you would not need withLatestFrom anymore. You'll just read the current snapshot of the state in question whenever you need to.

@chris-hatton
Copy link

Thanks @elizarov. I feel like I should have realised that before - I've now refactored my code to use StateFlow in this way, and it does indeed support my use cases. I suppose even if you need to work with some 3rd party Flow, it can still be snapshot via stateIn, in whatever scope needed, to achieve the same pattern.

@hoc081098
Copy link

I have added it in https://github.com/hoc081098/FlowExt library for anyone who needs it

@zirman
Copy link

zirman commented Aug 4, 2021

Here is another version. Collection from Flow doesn't start until Flow emit's its first value. This has a dependency on Arrow Core for boxing the value from Flow.

fun <A, B, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> =
    channelFlow {
        val latestB = AtomicReference<Option<B>>(None)

        launch {
            other.collect {
                if (latestB.getAndSet(Some(it)).isEmpty()) {
                    launch {
                        collect { a -> channel.send(transform(a, (latestB.get() as Some).value)) }
                    }
                }
            }
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests