Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stateIn with SharingStarted.Eagerly doesn't really immediately start collecting #4006

Open
dkhalanskyjb opened this issue Jan 4, 2024 · 1 comment

Comments

@dkhalanskyjb
Copy link
Collaborator

An example was submitted to us (slightly simplified):

runBlocking {
    val sharedFlow = MutableSharedFlow<Int>()
    val stateFlow = sharedFlow.stateIn(this, SharingStarted.Eagerly, 0)

    check(stateFlow.value == 0)
    sharedFlow.emit(1)
    delay(1.seconds)
    check(stateFlow.value == 1) // fails
}

The reason is that the subscription of stateIn only happens when the thread becomes available, which is only at delay in this example. Despite Eagerly promising that the subscription will happen immediately, the emit call is lost.

Adding a yield after stateIn fixes the issue, as stateIn gets a chance to finish its initialization.

This behavior is actually intentional (

* * Eager sharing does not start immediately, so the subscribers have actual chance to subscribe _prior_ to sharing.
*/
val start = if (started == SharingStarted.Eagerly) CoroutineStart.DEFAULT else CoroutineStart.UNDISPATCHED
): it's said that it's for the subscribers to have a chance to subscribe before the sharing starts. For example,

runBlocking {
    val myFlow = flow {
        emit(1); emit(2)
    }
    val stateFlow = myFlow.stateIn(this, SharingStarted.Eagerly, 0)
    launch(start = CoroutineStart.UNDISPATCHED) {
      stateFlow.collect {
        println(it) // guaranteed to observe the initial value 0
      }
    }
}

The code that ensures delivery of the initial element is tricky to write, as it requires knowingly starving the dispatcher of its threads that could perform the initialization code of stateIn in parallel. Also, the use cases are unclear, and it doesn't seem like this guarantee is even specified anywhere.

We should either document that Eagerly can sometimes fail to even run the initialization code of collect (for example, to subscribe to a shared flow) or change this behavior.

@igor-korotenko
Copy link

igor-korotenko commented Apr 9, 2024

agree, spent some time to realise this eventually, since was expecting that it might be used for pre-caching value, would be good to have some documentation on this at least :) Or to actually start it in case of StateFlow, since this way it could be used for pre-caching value for consumers, which appear later on

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants