-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
An example was submitted to us (slightly simplified):
runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
val stateFlow = sharedFlow.stateIn(this, SharingStarted.Eagerly, 0)
check(stateFlow.value == 0)
sharedFlow.emit(1)
delay(1.seconds)
check(stateFlow.value == 1) // fails
}
The reason is that the subscription of stateIn
only happens when the thread becomes available, which is only at delay
in this example. Despite Eagerly
promising that the subscription will happen immediately, the emit
call is lost.
Adding a yield
after stateIn
fixes the issue, as stateIn
gets a chance to finish its initialization.
This behavior is actually intentional (
kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
Lines 206 to 208 in 1a0287c
* * Eager sharing does not start immediately, so the subscribers have actual chance to subscribe _prior_ to sharing. | |
*/ | |
val start = if (started == SharingStarted.Eagerly) CoroutineStart.DEFAULT else CoroutineStart.UNDISPATCHED |
runBlocking {
val myFlow = flow {
emit(1); emit(2)
}
val stateFlow = myFlow.stateIn(this, SharingStarted.Eagerly, 0)
launch(start = CoroutineStart.UNDISPATCHED) {
stateFlow.collect {
println(it) // guaranteed to observe the initial value 0
}
}
}
The code that ensures delivery of the initial element is tricky to write, as it requires knowingly starving the dispatcher of its threads that could perform the initialization code of stateIn
in parallel. Also, the use cases are unclear, and it doesn't seem like this guarantee is even specified anywhere.
We should either document that Eagerly
can sometimes fail to even run the initialization code of collect
(for example, to subscribe to a shared flow) or change this behavior.