Skip to content

Commit 0575808

Browse files
Add withLatestFrom operator.
Co-authored-by: Ray Ryan <[email protected]>
1 parent 9420df3 commit 0575808

File tree

3 files changed

+331
-5
lines changed

3 files changed

+331
-5
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
884884
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
885885
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
886886
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
887+
public static final fun withLatestFrom (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
887888
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
888889
}
889890

kotlinx-coroutines-core/common/src/flow/operators/Zip.kt

+58-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package kotlinx.coroutines.flow
1010

1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.channels.*
13+
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1314
import kotlinx.coroutines.flow.internal.*
1415
import kotlinx.coroutines.selects.*
1516
import kotlin.jvm.*
@@ -174,6 +175,57 @@ internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory:
174175
}
175176
}
176177

178+
/**
179+
* Returns a [Flow] whose values are generated by the [transform] function every time this flow emits by combining the
180+
* emitted value with the latest value emitted by `other`.
181+
*
182+
* Emissions are only triggered by this flow, not `other`, and the returned flow will remain active as long as this flow
183+
* is, completing only when this flow completes. `other` will be cancelled if this flow completes first.
184+
*
185+
* The operator does not call [transform], and the returned flow will not emit, until `other` emits – if `other` does
186+
* not emit, the returned flow will complete without emitting. However after `other` has emitted its first value, the
187+
* returned flow will ignore `other`'s completion and continue to cache the last value it emitted.
188+
*
189+
* It can be demonstrated with the following example:
190+
* ```
191+
* val flow = flowOf(1, 2, 3).delayEach(10)
192+
* val flow2 = flowOf("a", "b").delayEach(15)
193+
* flow.withLatestFrom(flow2) { i, s -> i.toString() + s }.collect {
194+
* println(it) // Will print "1a 2a 3b"
195+
* }
196+
* ```
197+
*/
198+
@ExperimentalCoroutinesApi
199+
public fun <T1, T2, R> Flow<T1>.withLatestFrom(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
200+
coroutineScope {
201+
val firstChannel = asFairChannel(this@withLatestFrom)
202+
firstChannel.consume {
203+
var firstIsClosed = false
204+
205+
// This operator conflates values from the other Flow anyway, so the channel doesn't need any backpressure.
206+
val secondChannel = asFairChannel(other, capacity = CONFLATED)
207+
secondChannel.consume {
208+
// Nothing can be emitted until the other Flow emits its first value, so don't enter the loop until
209+
// that's happened.
210+
var secondValue: Any = secondChannel.receiveOrNull() ?: return@coroutineScope
211+
var secondIsClosed = false
212+
213+
while (!firstIsClosed) {
214+
select<Unit> {
215+
onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
216+
emit(transform(NULL.unbox(value), NULL.unbox(secondValue)))
217+
}
218+
219+
onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
220+
secondValue = value
221+
}
222+
}
223+
}
224+
}
225+
}
226+
}
227+
}
228+
177229
private inline fun SelectBuilder<Unit>.onReceive(
178230
isClosed: Boolean,
179231
channel: ReceiveChannel<Any>,
@@ -188,12 +240,13 @@ private inline fun SelectBuilder<Unit>.onReceive(
188240
}
189241

190242
// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
191-
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
192-
val channel = channel as ChannelCoroutine<Any>
193-
flow.collect { value ->
194-
channel.sendFair(value ?: NULL)
243+
private fun CoroutineScope.asFairChannel(flow: Flow<*>, capacity: Int = 0): ReceiveChannel<Any> =
244+
produce(capacity = capacity) {
245+
val channel = channel as ChannelCoroutine<Any>
246+
flow.collect { value ->
247+
channel.sendFair(value ?: NULL)
248+
}
195249
}
196-
}
197250

198251

199252
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlin.test.*
9+
10+
/*
11+
* Replace: { i, j -> i + j } -> ::sum as soon as KT-30991 is fixed
12+
*/
13+
class WithLatestFromTest : TestBase() {
14+
15+
@Test
16+
fun testWithLatestFrom() = runTest {
17+
val flow = flowOf("a", "b", "c")
18+
val flow2 = flowOf(1, 2, 3)
19+
val list = flow.withLatestFrom(flow2) { i, j -> i + j }.toList()
20+
assertEquals(listOf("a1", "b2", "c3"), list)
21+
}
22+
23+
@Test
24+
fun testNulls() = runTest {
25+
val flow = flowOf("a", null, null)
26+
val flow2 = flowOf(1, 2, 3)
27+
val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList()
28+
assertEquals(listOf("a1", "null2", "null3"), list)
29+
}
30+
31+
@Test
32+
fun testNullsOther() = runTest {
33+
val flow = flowOf("a", "b", "c")
34+
val flow2 = flowOf(null, 2, null)
35+
val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList()
36+
assertEquals(listOf("anull", "b2", "cnull"), list)
37+
}
38+
39+
@Test
40+
fun testEmptyFlows() = runTest {
41+
val flow = emptyFlow<String>().withLatestFrom(emptyFlow<Int>(), { i, j -> i + j })
42+
assertNull(flow.singleOrNull())
43+
}
44+
45+
@Test
46+
fun testFirstIsEmpty() = runTest {
47+
val f1 = emptyFlow<String>()
48+
val f2 = flowOf(1)
49+
assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList())
50+
}
51+
52+
@Test
53+
fun testSecondIsEmpty() = runTest {
54+
val f1 = flowOf("a")
55+
val f2 = emptyFlow<Int>()
56+
assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList())
57+
}
58+
59+
@Test
60+
fun testPreservingOrder() = runTest {
61+
val f1 = flow {
62+
expect(1)
63+
emit("a")
64+
expect(3)
65+
emit("b")
66+
emit("c")
67+
expect(5)
68+
}
69+
70+
val f2 = flow {
71+
expect(2)
72+
emit(1)
73+
yield()
74+
yield()
75+
expect(4)
76+
emit(2)
77+
expect(6)
78+
yield()
79+
expectUnreached()
80+
}
81+
82+
val result = f1.withLatestFrom(f2) { i, j -> i + j }.toList()
83+
assertEquals(listOf("a1", "b1", "c1"), result)
84+
finish(7)
85+
}
86+
87+
@Test
88+
fun testPreservingOrderReversed() = runTest {
89+
val f1 = flow {
90+
expect(1)
91+
emit("a")
92+
expect(3)
93+
emit("b")
94+
emit("c")
95+
expect(4)
96+
}
97+
98+
val f2 = flow {
99+
yield() // One more yield because now this flow starts first
100+
expect(2)
101+
emit(1)
102+
yield()
103+
yield()
104+
expect(5)
105+
emit(2)
106+
expect(6)
107+
yield()
108+
expect(7)
109+
emit(3)
110+
}
111+
112+
val result = f2.withLatestFrom(f1) { i, j -> j + i }.toList()
113+
assertEquals(listOf("a1", "c2", "c3"), result)
114+
finish(8)
115+
}
116+
117+
@Test
118+
fun testContextIsIsolated() = runTest {
119+
val f1 = flow {
120+
emit("a")
121+
assertEquals("first", NamedDispatchers.name())
122+
expect(1)
123+
}.flowOn(NamedDispatchers("first")).onEach {
124+
assertEquals("nested", NamedDispatchers.name())
125+
expect(2)
126+
}.flowOn(NamedDispatchers("nested"))
127+
128+
val f2 = flow {
129+
emit(1)
130+
assertEquals("second", NamedDispatchers.name())
131+
expect(3)
132+
}.flowOn(NamedDispatchers("second"))
133+
.onEach {
134+
assertEquals("onEach", NamedDispatchers.name())
135+
expect(4)
136+
}.flowOn(NamedDispatchers("onEach"))
137+
138+
val value = withContext(NamedDispatchers("main")) {
139+
f1.withLatestFrom(f2) { i, j ->
140+
assertEquals("main", NamedDispatchers.name())
141+
expect(5)
142+
i + j
143+
}.single()
144+
}
145+
146+
assertEquals("a1", value)
147+
finish(6)
148+
}
149+
150+
@Test
151+
fun testErrorInDownstreamCancelsUpstream() = runTest {
152+
val f1 = flow {
153+
emit("a")
154+
hang {
155+
expect(2)
156+
}
157+
}.flowOn(NamedDispatchers("first"))
158+
159+
val f2 = flow {
160+
emit(1)
161+
hang {
162+
expect(3)
163+
}
164+
}.flowOn(NamedDispatchers("second"))
165+
166+
val flow = f1.withLatestFrom(f2) { i, j ->
167+
assertEquals("combine", NamedDispatchers.name())
168+
expect(1)
169+
i + j
170+
}.flowOn(NamedDispatchers("combine")).onEach {
171+
throw TestException()
172+
}
173+
174+
assertFailsWith<TestException>(flow)
175+
finish(4)
176+
}
177+
178+
@Test
179+
fun testErrorCancelsSibling() = runTest {
180+
val f1 = flow {
181+
emit("a")
182+
hang {
183+
expect(1)
184+
}
185+
}.flowOn(NamedDispatchers("first"))
186+
187+
val f2 = flow {
188+
emit(1)
189+
throw TestException()
190+
}.flowOn(NamedDispatchers("second"))
191+
192+
val flow = f1.withLatestFrom(f2) { _, _ -> 1 }
193+
assertFailsWith<TestException>(flow)
194+
finish(2)
195+
}
196+
197+
@Test
198+
fun testErrorCancelsSiblingReversed() = runTest {
199+
val f1 = flow {
200+
emit("a")
201+
throw TestException()
202+
}
203+
204+
val f2 = flow {
205+
emit(1)
206+
hang {
207+
expect(1)
208+
}
209+
}
210+
211+
val flow = f1.withLatestFrom(f2) { _, _ -> 1 }
212+
assertFailsWith<TestException>(flow)
213+
finish(2)
214+
}
215+
216+
@Test
217+
fun testCancellationExceptionUpstream() = runTest {
218+
val f1 = flow {
219+
expect(1)
220+
emit(1)
221+
throw CancellationException("")
222+
}
223+
val f2 = flow {
224+
emit(1)
225+
hang { expect(3) }
226+
}
227+
228+
val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) }
229+
assertFailsWith<CancellationException>(flow)
230+
finish(4)
231+
}
232+
233+
@Test
234+
fun testCancellationExceptionUpstreamReversed() = runTest {
235+
val f1 = flow {
236+
expect(1)
237+
emit(1)
238+
hang { expect(3) }
239+
}
240+
val f2 = flow {
241+
emit(1)
242+
throw CancellationException("")
243+
}
244+
245+
val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) }
246+
assertFailsWith<CancellationException>(flow)
247+
finish(4)
248+
}
249+
250+
@Test
251+
fun testCancellationExceptionDownstream() = runTest {
252+
val f1 = flow {
253+
emit(1)
254+
expect(3)
255+
hang { expect(6) }
256+
}
257+
val f2 = flow {
258+
emit(1)
259+
expect(2)
260+
hang { expect(5) }
261+
}
262+
263+
val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach {
264+
expect(1)
265+
yield()
266+
expect(4)
267+
throw CancellationException("")
268+
}
269+
assertFailsWith<CancellationException>(flow)
270+
finish(7)
271+
}
272+
}

0 commit comments

Comments
 (0)