Skip to content

Commit 5173824

Browse files
elizarovqwwdfsad
authored andcommitted
EventLoop integration and reuse for runBlocking and Unconfined dispatchers (#889)
EventLoop integration and reuse for runBlocking and Unconfined dispatchers * Event loop that is created by runBlocking or by Unconfined dispatcher is reused across the same thread to prevent blocking of the loop * Semantics of runBlocking and Unconfined are fully retained * DefaultExecutor also registers itself as running event loop and thus cannot be blocked by runBlocking * Consolidates thread-local handling for native * Also fixes thread-local memory leak on JVM (does not use custom class) * Restore BlockingEventLoop class name for runBlocking event loop impl * Internal API for custom waiting loops to be used by kotlinx-io Fixes #860 Fixes #850
1 parent 1d04e79 commit 5173824

File tree

24 files changed

+596
-362
lines changed

24 files changed

+596
-362
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
254254
public abstract fun dispose ()V
255255
}
256256

257+
public final class kotlinx/coroutines/EventLoopKt {
258+
public static final fun processNextEventInCurrentThread ()J
259+
}
260+
257261
public abstract class kotlinx/coroutines/ExecutorCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, java/io/Closeable {
258262
public fun <init> ()V
259263
public abstract fun close ()V

common/kotlinx-coroutines-core-common/src/Dispatched.kt

Lines changed: 58 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -12,70 +12,64 @@ import kotlin.jvm.*
1212
@SharedImmutable
1313
private val UNDEFINED = Symbol("UNDEFINED")
1414

15-
@NativeThreadLocal
16-
internal object UndispatchedEventLoop {
17-
data class EventLoop(
18-
@JvmField var isActive: Boolean = false,
19-
@JvmField val queue: ArrayQueue<Runnable> = ArrayQueue()
20-
)
21-
22-
@JvmField
23-
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }
24-
25-
/**
26-
* Executes given [block] as part of current event loop, updating related to block [continuation]
27-
* mode and state if continuation is not resumed immediately.
28-
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
29-
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
30-
*/
31-
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
32-
doYield: Boolean = false, block: () -> Unit) : Boolean {
33-
val eventLoop = threadLocalEventLoop.get()
34-
if (eventLoop.isActive) {
35-
// If we are yielding and queue is empty, we can bail out as part of fast path
36-
if (doYield && eventLoop.queue.isEmpty) {
37-
return false
38-
}
39-
40-
continuation._state = contState
41-
continuation.resumeMode = mode
42-
eventLoop.queue.addLast(continuation)
43-
return true
44-
}
45-
46-
runEventLoop(eventLoop, block)
47-
return false
15+
/**
16+
* Executes given [block] as part of current event loop, updating current continuation
17+
* mode and state if continuation is not resumed immediately.
18+
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
19+
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
20+
*/
21+
private inline fun DispatchedContinuation<*>.executeUnconfined(
22+
contState: Any?, mode: Int, doYield: Boolean = false,
23+
block: () -> Unit
24+
) : Boolean {
25+
val eventLoop = ThreadLocalEventLoop.eventLoop
26+
// If we are yielding and unconfined queue is empty, we can bail out as part of fast path
27+
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
28+
return if (eventLoop.isUnconfinedLoopActive) {
29+
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
30+
_state = contState
31+
resumeMode = mode
32+
eventLoop.dispatchUnconfined(this)
33+
true // queued into the active loop
34+
} else {
35+
// Was not active -- run event loop until all unconfined tasks are executed
36+
runUnconfinedEventLoop(eventLoop, block = block)
37+
false
4838
}
39+
}
4940

50-
fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
51-
val eventLoop = threadLocalEventLoop.get()
52-
if (eventLoop.isActive) {
53-
eventLoop.queue.addLast(task)
54-
return true
41+
private fun DispatchedTask<*>.resumeUnconfined() {
42+
val eventLoop = ThreadLocalEventLoop.eventLoop
43+
if (eventLoop.isUnconfinedLoopActive) {
44+
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
45+
eventLoop.dispatchUnconfined(this)
46+
} else {
47+
// Was not active -- run event loop until all unconfined tasks are executed
48+
runUnconfinedEventLoop(eventLoop) {
49+
resume(delegate, MODE_UNDISPATCHED)
5550
}
56-
57-
runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
58-
return false
5951
}
52+
}
6053

61-
inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) {
62-
try {
63-
eventLoop.isActive = true
64-
block()
65-
while (true) {
66-
val nextEvent = eventLoop.queue.removeFirstOrNull() ?: return
67-
nextEvent.run()
68-
}
69-
} catch (e: Throwable) {
70-
/*
71-
* This exception doesn't happen normally, only if user either submitted throwing runnable
72-
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
73-
*/
74-
eventLoop.queue.clear()
75-
throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
76-
} finally {
77-
eventLoop.isActive = false
54+
private inline fun runUnconfinedEventLoop(
55+
eventLoop: EventLoop,
56+
block: () -> Unit
57+
) {
58+
eventLoop.incrementUseCount(unconfined = true)
59+
try {
60+
block()
61+
while (true) {
62+
// break when all unconfined continuations where executed
63+
if (!eventLoop.processUnconfinedEvent()) break
7864
}
65+
} catch (e: Throwable) {
66+
/*
67+
* This exception doesn't happen normally, only if user either submitted throwing runnable
68+
* or if we have a bug in implementation. Throw an exception that better explains the problem.
69+
*/
70+
throw DispatchException("Unexpected exception in unconfined event loop", e)
71+
} finally {
72+
eventLoop.decrementUseCount(unconfined = true)
7973
}
8074
}
8175

@@ -109,7 +103,7 @@ internal class DispatchedContinuation<in T>(
109103
resumeMode = MODE_ATOMIC_DEFAULT
110104
dispatcher.dispatch(context, this)
111105
} else {
112-
UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) {
106+
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
113107
withCoroutineContext(this.context, countOrElement) {
114108
continuation.resumeWith(result)
115109
}
@@ -124,7 +118,7 @@ internal class DispatchedContinuation<in T>(
124118
resumeMode = MODE_CANCELLABLE
125119
dispatcher.dispatch(context, this)
126120
} else {
127-
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
121+
executeUnconfined(value, MODE_CANCELLABLE) {
128122
if (!resumeCancelled()) {
129123
resumeUndispatched(value)
130124
}
@@ -141,7 +135,7 @@ internal class DispatchedContinuation<in T>(
141135
resumeMode = MODE_CANCELLABLE
142136
dispatcher.dispatch(context, this)
143137
} else {
144-
UndispatchedEventLoop.execute(this, state, MODE_CANCELLABLE) {
138+
executeUnconfined(state, MODE_CANCELLABLE) {
145139
if (!resumeCancelled()) {
146140
resumeUndispatchedWithException(exception)
147141
}
@@ -207,7 +201,7 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
207201
}
208202

209203
internal abstract class DispatchedTask<in T>(
210-
@JvmField var resumeMode: Int
204+
@JvmField public var resumeMode: Int
211205
) : SchedulerTask() {
212206
public abstract val delegate: Continuation<T>
213207

@@ -248,7 +242,7 @@ internal abstract class DispatchedTask<in T>(
248242
}
249243

250244
internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
251-
UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) {
245+
executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
252246
run()
253247
}
254248

@@ -261,7 +255,7 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
261255
if (dispatcher.isDispatchNeeded(context)) {
262256
dispatcher.dispatch(context, this)
263257
} else {
264-
UndispatchedEventLoop.resumeUndispatched(this)
258+
resumeUnconfined()
265259
}
266260
} else {
267261
resume(delegate, mode)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.internal.*
8+
9+
/**
10+
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
11+
* be asked to process next event from their event queue.
12+
*
13+
* It may optionally implement [Delay] interface and support time-scheduled tasks.
14+
* It is created or pigged back onto (see [ThreadLocalEventLoop])
15+
* by [runBlocking] and by [Dispatchers.Unconfined].
16+
*
17+
* @suppress **This an internal API and should not be used from general code.**
18+
*/
19+
internal abstract class EventLoop : CoroutineDispatcher() {
20+
/**
21+
* Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop.
22+
*/
23+
private var useCount = 0L
24+
25+
/**
26+
* Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so
27+
* this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
28+
* by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
29+
*/
30+
private var shared = false
31+
32+
/**
33+
* Queue used by [Dispatchers.Unconfined] tasks.
34+
* These tasks are thread-local for performance and take precedence over the rest of the queue.
35+
*/
36+
private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
37+
38+
/**
39+
* Processes next event in this event loop.
40+
*
41+
* The result of this function is to be interpreted like this:
42+
* * `<= 0` -- there are potentially more events for immediate processing;
43+
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
44+
* * [Long.MAX_VALUE] -- no more events.
45+
*
46+
* **NOTE**: Must be invoked only from the event loop's thread
47+
* (no check for performance reasons, may be added in the future).
48+
*/
49+
public open fun processNextEvent(): Long {
50+
if (!processUnconfinedEvent()) return Long.MAX_VALUE
51+
return nextTime
52+
}
53+
54+
protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
55+
56+
protected open val nextTime: Long
57+
get() {
58+
val queue = unconfinedQueue ?: return Long.MAX_VALUE
59+
return if (queue.isEmpty) Long.MAX_VALUE else 0L
60+
}
61+
62+
public fun processUnconfinedEvent(): Boolean {
63+
val queue = unconfinedQueue ?: return false
64+
val task = queue.removeFirstOrNull() ?: return false
65+
task.run()
66+
return true
67+
}
68+
/**
69+
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
70+
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
71+
* By default, event loop implementation is thread-local and should not processed in the context
72+
* (current thread's event loop should be processed instead).
73+
*/
74+
public open fun shouldBeProcessedFromContext(): Boolean = false
75+
76+
/**
77+
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
78+
* into the current event loop.
79+
*/
80+
public fun dispatchUnconfined(task: DispatchedTask<*>) {
81+
val queue = unconfinedQueue ?:
82+
ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
83+
queue.addLast(task)
84+
}
85+
86+
public val isActive: Boolean
87+
get() = useCount > 0
88+
89+
public val isUnconfinedLoopActive: Boolean
90+
get() = useCount >= delta(unconfined = true)
91+
92+
// May only be used from the event loop's thread
93+
public val isUnconfinedQueueEmpty: Boolean
94+
get() = unconfinedQueue?.isEmpty ?: true
95+
96+
private fun delta(unconfined: Boolean) =
97+
if (unconfined) (1L shl 32) else 1L
98+
99+
fun incrementUseCount(unconfined: Boolean = false) {
100+
useCount += delta(unconfined)
101+
if (!unconfined) shared = true
102+
}
103+
104+
fun decrementUseCount(unconfined: Boolean = false) {
105+
useCount -= delta(unconfined)
106+
if (useCount > 0) return
107+
check(useCount == 0L) { "Extra decrementUseCount" }
108+
if (shared) {
109+
// shut it down and remove from ThreadLocalEventLoop
110+
shutdown()
111+
}
112+
}
113+
114+
protected open fun shutdown() {}
115+
}
116+
117+
@NativeThreadLocal
118+
internal object ThreadLocalEventLoop {
119+
private val ref = CommonThreadLocal<EventLoop?>()
120+
121+
internal val eventLoop: EventLoop
122+
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
123+
124+
internal fun currentOrNull(): EventLoop? =
125+
ref.get()
126+
127+
internal fun resetEventLoop() {
128+
ref.set(null)
129+
}
130+
131+
internal fun setEventLoop(eventLoop: EventLoop) {
132+
ref.set(eventLoop)
133+
}
134+
}
135+
136+
internal expect fun createEventLoop(): EventLoop
137+

common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44

55
package kotlinx.coroutines.internal
66

7-
internal class ArrayQueue<T : Any> {
7+
internal open class ArrayQueue<T : Any> {
88
private var elements = arrayOfNulls<Any>(16)
99
private var head = 0
1010
private var tail = 0
11+
1112
val isEmpty: Boolean get() = head == tail
1213

1314
public fun addLast(element: T) {

common/kotlinx-coroutines-core-common/src/internal/ThreadLocal.common.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kotlinx.coroutines.internal
88
@UseExperimental(ExperimentalMultiplatform::class)
99
internal expect annotation class NativeThreadLocal()
1010

11-
internal expect class CommonThreadLocal<T>(supplier: () -> T) {
11+
internal expect class CommonThreadLocal<T>() {
1212
fun get(): T
13+
fun set(value: T)
1314
}

0 commit comments

Comments
 (0)