-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
I have a requirement to have a flow that is converted to a SharedFlow
as multiple downstream subscribers consume the result, but the originating flow is particularly expensive so multiple collection is unacceptable.
Using shareIn
solves this issue, however a requirement is that the downstream subscribers must also know of the completion status of the original flow. I've implemented a materialize
& demateralize
function as mentioned in both #2751 & #2034 (comment)
As a side-effect of this error-handling design decision, the SharedFlow never completes. A call collect { ... } on a SharedFlow must be canceled to terminate it. However, if completion is needed, then it can always be materialized by a special emitted value. A collector can apply takeWhile operator to complete the resulting flow when this special value is encountered.
I've wrapped this up in a generalised function shareUntilComplete
with the same signature as shareIn
, there remains however a significant problem with implementing the replay
functionality - the gist of the function body is as follows:
return materialize()
.shareIn(coroutineScope, started, replay)
.dematerialize()
The issue is that the completion symbol will count towards the replay
allocation, and I can't simply do replay + 1
, because a subscriber that joins before completion will end up in one more than the intended replay count from buffer, and if you just do replay
directly, then subscribers before completion will get the expected result, but late subscribers would only get an empty completing flow (because the only thing in cache was the completion notification).
Is there a simple solution I'm overlooking here?