Skip to content

Commit db0e22d

Browse files
committed
Select expression is modularized.
SelectClause(0,1,2) interfaces are introduced, so that synchronization constructs can define their select clauses without having to modify the source of the SelectBuilder.
1 parent e8d7934 commit db0e22d

File tree

15 files changed

+264
-185
lines changed

15 files changed

+264
-185
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CompletableDeferred.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import kotlinx.coroutines.experimental.selects.SelectInstance
19+
import kotlinx.coroutines.experimental.selects.SelectClause1
2020

2121
/**
2222
* A [Deferred] that can be completed via public functions
@@ -86,8 +86,8 @@ public fun <T> CompletableDeferred(value: T): CompletableDeferred<T> = Completab
8686
private class CompletableDeferredImpl<T> : JobSupport(true), CompletableDeferred<T> {
8787
override fun getCompleted(): T = getCompletedInternal() as T
8888
suspend override fun await(): T = awaitInternal() as T
89-
override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) =
90-
registerSelectAwaitInternal(select, block as (suspend (Any?) -> R))
89+
override val onAwait: SelectClause1<T>
90+
get() = this as SelectClause1<T>
9191

9292
override fun complete(value: T): Boolean {
9393
loopOnState { state ->

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import kotlinx.coroutines.experimental.selects.SelectBuilder
20-
import kotlinx.coroutines.experimental.selects.SelectInstance
19+
import kotlinx.coroutines.experimental.selects.SelectClause1
2120
import kotlinx.coroutines.experimental.selects.select
2221
import kotlin.coroutines.experimental.CoroutineContext
2322

@@ -91,16 +90,17 @@ public interface Deferred<out T> : Job {
9190
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
9291
* immediately resumes with [CancellationException].
9392
*
94-
* This function can be used in [select] invocation with [onAwait][SelectBuilder.onAwait] clause.
93+
* This function can be used in [select] invocation with [onAwait] clause.
9594
* Use [isCompleted] to check for completion of this deferred value without waiting.
9695
*/
9796
public suspend fun await(): T
9897

9998
/**
100-
* Registers [onAwait][SelectBuilder.onAwait] select clause.
101-
* @suppress **This is unstable API and it is subject to change.**
99+
* Clause for [select] expression of [await] suspending function that selects with the deferred value when it is
100+
* resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
101+
* it cancelled).
102102
*/
103-
public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
103+
public val onAwait: SelectClause1<T>
104104

105105
/**
106106
* Returns *completed* result or throws [IllegalStateException] if this deferred value has not
@@ -175,8 +175,8 @@ private open class DeferredCoroutine<T>(
175175
) : AbstractCoroutine<T>(parentContext, active), Deferred<T> {
176176
override fun getCompleted(): T = getCompletedInternal() as T
177177
suspend override fun await(): T = awaitInternal() as T
178-
override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) =
179-
registerSelectAwaitInternal(select, block as (suspend (Any?) -> R))
178+
override val onAwait: SelectClause1<T>
179+
get() = this as SelectClause1<T>
180180
}
181181

182182
private class LazyDeferredCoroutine<T>(

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
2222
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
2323
import kotlinx.coroutines.experimental.internal.OpDescriptor
2424
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
25-
import kotlinx.coroutines.experimental.selects.SelectBuilder
25+
import kotlinx.coroutines.experimental.selects.SelectClause0
26+
import kotlinx.coroutines.experimental.selects.SelectClause1
2627
import kotlinx.coroutines.experimental.selects.SelectInstance
2728
import kotlinx.coroutines.experimental.selects.select
2829
import java.util.concurrent.Future
@@ -168,11 +169,17 @@ public interface Job : CoroutineContext.Element {
168169
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
169170
* suspending function is suspended, this function immediately resumes with [CancellationException].
170171
*
171-
* This function can be used in [select] invocation with [onJoin][SelectBuilder.onJoin] clause.
172+
* This function can be used in [select] invocation with [onJoin] clause.
172173
* Use [isCompleted] to check for completion of this job without waiting.
173174
*/
174175
public suspend fun join()
175176

177+
/**
178+
* Clause for [select] expression of [join] suspending function that selects when the job is complete.
179+
* This clause never fails, even if the job completes exceptionally.
180+
*/
181+
public val onJoin: SelectClause0
182+
176183
// ------------ low-level state-notification ------------
177184

178185
/**
@@ -225,12 +232,6 @@ public interface Job : CoroutineContext.Element {
225232

226233
// ------------ unstable internal API ------------
227234

228-
/**
229-
* Registers [onJoin][SelectBuilder.onJoin] select clause.
230-
* @suppress **This is unstable API and it is subject to change.**
231-
*/
232-
public fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R)
233-
234235
/**
235236
* @suppress **Error**: Operator '+' on two Job objects is meaningless.
236237
* Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
@@ -375,7 +376,7 @@ public object NonDisposableHandle : DisposableHandle {
375376
* @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
376377
* @suppress **This is unstable API and it is subject to change.**
377378
*/
378-
public open class JobSupport(active: Boolean) : Job {
379+
public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause1<Any?> {
379380
override val key: CoroutineContext.Key<*> get() = Job
380381

381382
/*
@@ -728,7 +729,11 @@ public open class JobSupport(active: Boolean) : Job {
728729
cont.disposeOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
729730
}
730731

731-
override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
732+
final override val onJoin: SelectClause0
733+
get() = this
734+
735+
// registerSelectJoin
736+
final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
732737
// fast-path -- check state and select/return if needed
733738
loopOnState { state ->
734739
if (select.isSelected) return
@@ -956,7 +961,8 @@ public open class JobSupport(active: Boolean) : Job {
956961
})
957962
}
958963

959-
protected fun <R> registerSelectAwaitInternal(select: SelectInstance<R>, block: suspend (Any?) -> R) {
964+
// registerSelectAwaitInternal
965+
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Any?) -> R) {
960966
// fast-path -- check state and select/return if needed
961967
loopOnState { state ->
962968
if (select.isSelected) return

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import kotlinx.coroutines.experimental.NonCancellable.isActive
20-
import kotlinx.coroutines.experimental.selects.SelectInstance
20+
import kotlinx.coroutines.experimental.selects.SelectClause0
2121
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
2222

2323
/**
@@ -49,13 +49,8 @@ object NonCancellable : AbstractCoroutineContextElement(Job), Job {
4949
throw UnsupportedOperationException("This job is always active")
5050
}
5151

52-
/**
53-
* Always throws [UnsupportedOperationException].
54-
* @suppress **This is unstable API and it is subject to change.**
55-
*/
56-
override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
57-
throw UnsupportedOperationException("This job is always active")
58-
}
52+
override val onJoin: SelectClause0
53+
get() = throw UnsupportedOperationException("This job is always active")
5954

6055
/** Always throws [IllegalStateException]. */
6156
override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import kotlinx.coroutines.experimental.internal.*
2222
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2323
import kotlinx.coroutines.experimental.removeOnCancel
2424
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
25+
import kotlinx.coroutines.experimental.selects.SelectClause1
26+
import kotlinx.coroutines.experimental.selects.SelectClause2
2527
import kotlinx.coroutines.experimental.selects.SelectInstance
2628
import kotlinx.coroutines.experimental.suspendAtomicCancellableCoroutine
2729
import kotlin.coroutines.experimental.startCoroutine
@@ -134,8 +136,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
134136
*/
135137
protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
136138
val prev = node.prev
137-
if (prev is SendBuffered<*>)
138-
prev.remove()
139+
(prev as? SendBuffered<*>)?.remove()
139140
}
140141

141142
/**
@@ -165,8 +166,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
165166
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
166167
super.finishOnSuccess(affected, next)
167168
// remove previous SendBuffered
168-
if (affected is SendBuffered<*>)
169-
affected.remove()
169+
(affected as? SendBuffered<*>)?.remove()
170170
}
171171
}
172172

@@ -315,11 +315,11 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
315315
}
316316
}
317317

318-
private inner class TryEnqueueSendDesc<E, R>(
318+
private inner class TryEnqueueSendDesc<R>(
319319
element: E,
320320
select: SelectInstance<R>,
321-
block: suspend () -> R
322-
) : AddLastDesc<SendSelect<R>>(queue, SendSelect(element, select, block)) {
321+
block: suspend (SendChannel<E>) -> R
322+
) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
323323
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
324324
if (affected is ReceiveOrClosed<*>) {
325325
return affected as? Closed<*> ?: ENQUEUE_FAILED
@@ -339,7 +339,14 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
339339
}
340340
}
341341

342-
override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
342+
final override val onSend: SelectClause2<E, SendChannel<E>>
343+
get() = object : SelectClause2<E, SendChannel<E>> {
344+
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
345+
registerSelectSend(select, param, block)
346+
}
347+
}
348+
349+
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
343350
while (true) {
344351
if (select.isSelected) return
345352
if (isFull) {
@@ -357,7 +364,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
357364
offerResult === ALREADY_SELECTED -> return
358365
offerResult === OFFER_FAILED -> {} // retry
359366
offerResult === OFFER_SUCCESS -> {
360-
block.startCoroutineUndispatched(select.completion)
367+
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
361368
return
362369
}
363370
offerResult is Closed<*> -> throw offerResult.sendException
@@ -369,17 +376,18 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
369376

370377
// ------ private ------
371378

372-
private class SendSelect<R>(
379+
private class SendSelect<E, R>(
373380
override val pollResult: Any?,
381+
@JvmField val channel: SendChannel<E>,
374382
@JvmField val select: SelectInstance<R>,
375-
@JvmField val block: suspend () -> R
383+
@JvmField val block: suspend (SendChannel<E>) -> R
376384
) : LockFreeLinkedListNode(), Send, DisposableHandle {
377385
override fun tryResumeSend(idempotent: Any?): Any? =
378386
if (select.trySelect(idempotent)) SELECT_STARTED else null
379387

380388
override fun completeResumeSend(token: Any) {
381389
check(token === SELECT_STARTED)
382-
block.startCoroutine(select.completion)
390+
block.startCoroutine(receiver = channel, completion = select.completion)
383391
}
384392

385393
fun disposeOnSelect() {
@@ -390,7 +398,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
390398
remove()
391399
}
392400

393-
override fun toString(): String = "SendSelect($pollResult)[$select]"
401+
override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
394402
}
395403

396404
private class SendBuffered<out E>(
@@ -614,8 +622,15 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
614622
}
615623
}
616624

625+
final override val onReceive: SelectClause1<E>
626+
get() = object : SelectClause1<E> {
627+
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
628+
registerSelectReceive(select, block)
629+
}
630+
}
631+
617632
@Suppress("UNCHECKED_CAST")
618-
override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
633+
private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
619634
while (true) {
620635
if (select.isSelected) return
621636
if (isEmpty) {
@@ -641,8 +656,15 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
641656
}
642657
}
643658

659+
final override val onReceiveOrNull: SelectClause1<E?>
660+
get() = object : SelectClause1<E?> {
661+
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
662+
registerSelectReceiveOrNull(select, block)
663+
}
664+
}
665+
644666
@Suppress("UNCHECKED_CAST")
645-
override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
667+
private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
646668
while (true) {
647669
if (select.isSelected) return
648670
if (isEmpty) {

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.selects.SelectClause2
2021
import kotlinx.coroutines.experimental.selects.SelectInstance
2122
import kotlin.coroutines.experimental.CoroutineContext
2223

@@ -105,7 +106,7 @@ private class LazyActorCoroutine<E>(
105106
parentContext: CoroutineContext,
106107
channel: Channel<E>,
107108
private val block: suspend ActorScope<E>.() -> Unit
108-
) : ActorCoroutine<E>(parentContext, channel, active = false) {
109+
) : ActorCoroutine<E>(parentContext, channel, active = false), SelectClause2<E, SendChannel<E>> {
109110
override val channel: Channel<E> get() = this
110111

111112
override fun onStart() {
@@ -122,9 +123,13 @@ private class LazyActorCoroutine<E>(
122123
return super.offer(element)
123124
}
124125

125-
override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
126+
override val onSend: SelectClause2<E, SendChannel<E>>
127+
get() = this
128+
129+
// registerSelectSend
130+
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
126131
start()
127-
return super.registerSelectSend(select, element, block)
132+
super.onSend.registerSelectClause2(select, param, block)
128133
}
129134
}
130135

0 commit comments

Comments
 (0)