Skip to content

Commit c2adef5

Browse files
committed
Fixed NPE when ArrayBroadcastChannel is closed concurrently with receive
Fixes #97
1 parent 2b76d8e commit c2adef5

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,13 +338,20 @@ class ArrayBroadcastChannel<E>(
338338
val subHead = this.subHead // guarded read (can be non-volatile read)
339339
// note: from the broadcastChannel we must read closed token first, then read its tail
340340
// because it is Ok if tail moves in between the reads (we make decision based on tail first)
341-
val closed = broadcastChannel.closedForReceive // unguarded volatile read
341+
val closedBroadcast = broadcastChannel.closedForReceive // unguarded volatile read
342342
val tail = broadcastChannel.tail // unguarded volatile read
343343
if (subHead >= tail) {
344344
// no elements to poll from the queue -- check if closed
345-
return closed ?: POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
345+
return closedBroadcast ?: POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
346346
}
347-
return broadcastChannel.elementAt(subHead)
347+
// Get tentative result. This result may be wrong (completely invalid value, including null),
348+
// because this subscription might get closed, moving channel's head past this subscription's head.
349+
val result = broadcastChannel.elementAt(subHead)
350+
// now check if this subscription was closed
351+
val closedSub = this.closedForReceive
352+
if (closedSub != null) return closedSub
353+
// we know the subscription was not closed, so this tentative result is Ok to return
354+
return result
348355
}
349356
}
350357
}

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,38 @@ class ArrayBroadcastChannelTest : TestBase() {
132132
yield()
133133
finish(6)
134134
}
135+
136+
@Test
137+
fun testReceiveFullAfterClose() = runBlocking<Unit> {
138+
val channel = BroadcastChannel<Int>(10)
139+
val sub = channel.openSubscription()
140+
// generate into buffer & close
141+
for (x in 1..5) channel.send(x)
142+
channel.close()
143+
// make sure all of them are consumed
144+
check(!sub.isClosedForReceive)
145+
for (x in 1..5) check(sub.receive() == x)
146+
check(sub.receiveOrNull() == null)
147+
check(sub.isClosedForReceive)
148+
}
149+
150+
@Test
151+
fun testCloseSubDuringIteration() = runBlocking<Unit> {
152+
val channel = BroadcastChannel<Int>(1)
153+
// launch generator (for later) in this context
154+
launch(coroutineContext) {
155+
for (x in 1..5) channel.send(x)
156+
channel.close()
157+
}
158+
// start consuming
159+
val sub = channel.openSubscription()
160+
var expected = 0
161+
sub.consumeEach {
162+
check(it == ++expected)
163+
if (it == 2) {
164+
sub.close()
165+
}
166+
}
167+
check(expected == 2)
168+
}
135169
}

0 commit comments

Comments
 (0)