Skip to content

Commit 331750b

Browse files
committed
Utilities for Reactive Streams, Rx 1.x, and Rx 2.x
1 parent bc296bb commit 331750b

File tree

54 files changed

+4339
-250
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+4339
-250
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ Library support for Kotlin coroutines. This is a companion version for Kotlin 1.
1212
* [kotlinx-coroutines-nio](kotlinx-coroutines-nio) module with extensions for asynchronous IO on JDK7+.
1313
* [kotlinx-coroutines-swing](kotlinx-coroutines-swing) module with `Swing` context for Swing UI applications.
1414
* [kotlinx-coroutines-javafx](kotlinx-coroutines-javafx) module with `JavaFx` context for JavaFX UI applications.
15-
* [kotlinx-coroutines-rx](kotlinx-coroutines-rx) module with utilities for [RxJava](https://github.com/ReactiveX/RxJava).
15+
* [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive) module with utilities for [Reactive Streams](http://www.reactive-streams.org)
16+
* [kotlinx-coroutines-rx1](kotlinx-coroutines-rx1) module with utilities for [RxJava 1.x](https://github.com/ReactiveX/RxJava/tree/1.x)
17+
* [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2) module with utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava)
1618

1719
## References and documentation
1820

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(
247247
FINAL_C Cancelled : Completed cancelled (final state)
248248
FINAL_F Failed : Completed failed for other reason (final state)
249249
FINAL_R <any> : Completed produced some result
250-
250+
251251
=== Transitions ===
252252
253253
New states Active states Inactive states

kotlinx-coroutines-reactive/README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Module kotlinx-coroutines-reactive
2+
3+
Utilities for [Reactive Streams](http://www.reactive-streams.org).
4+
5+
Coroutine builders:
6+
7+
| **Name** | **Result** | **Scope** | **Description**
8+
| --------------- | ----------------------------- | ---------------- | ---------------
9+
| [publish] | [Publisher][org.reactivestreams.Publisher] | [ProducerScope] | Cold reactive publisher that starts coroutine on subscribe
10+
11+
Suspending extension functions and suspending iteration:
12+
13+
| **Name** | **Description**
14+
| -------- | ---------------
15+
| [Publisher.awaitFirst][org.reactivestreams.Publisher.awaitFirst] | Returns the first value from the given publisher
16+
| [Publisher.awaitLast][org.reactivestreams.Publisher.awaitFirst] | Returns the last value from the given publisher
17+
| [Publisher.awaitSingle][org.reactivestreams.Publisher.awaitSingle] | Returns the single value from the given publisher
18+
| [Publisher.open][org.reactivestreams.Publisher.open] | Subscribes to publisher and returns [ReceiveChannel]
19+
| [Publisher.iterator][org.reactivestreams.Publisher.iterator] | Subscribes to publisher and returns [ChannelIterator]
20+
21+
Conversion functions:
22+
23+
| **Name** | **Description**
24+
| -------- | ---------------
25+
| [ReceiveChannel.toPublisher][kotlinx.coroutines.experimental.channels.ReceiveChannel.toPublisher] | Converts streaming channel to hot publisher
26+
27+
<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
28+
<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
29+
<!--- INDEX kotlinx.coroutines.experimental -->
30+
<!--- INDEX kotlinx.coroutines.experimental.channels -->
31+
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
32+
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
33+
[ChannelIterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel-iterator/index.html
34+
<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive -->
35+
<!--- DOCS_ROOT kotlinx-coroutines-reactive/target/dokka/kotlinx-coroutines-reactive -->
36+
<!--- INDEX kotlinx.coroutines.experimental.reactive -->
37+
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
38+
[org.reactivestreams.Publisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/index.html
39+
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/await-first.html
40+
[org.reactivestreams.Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/await-single.html
41+
[org.reactivestreams.Publisher.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/open.html
42+
[org.reactivestreams.Publisher.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/iterator.html
43+
[kotlinx.coroutines.experimental.channels.ReceiveChannel.toPublisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/to-publisher.html
44+
<!--- END -->
45+
46+
# Package kotlinx.coroutines.experimental.reactive
47+
48+
Utilities for [Reactive Streams](http://www.reactive-streams.org).

kotlinx-coroutines-reactive/pom.xml

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2016-2017 JetBrains s.r.o.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
19+
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<parent>
23+
<groupId>org.jetbrains.kotlinx</groupId>
24+
<artifactId>kotlinx-coroutines</artifactId>
25+
<version>0.11-rc-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>kotlinx-coroutines-reactive</artifactId>
29+
<packaging>jar</packaging>
30+
31+
<build>
32+
<sourceDirectory>src/main/kotlin</sourceDirectory>
33+
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
34+
</build>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.reactivestreams</groupId>
39+
<artifactId>reactive-streams</artifactId>
40+
<version>1.0.0</version>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.jetbrains.kotlinx</groupId>
44+
<artifactId>kotlinx-coroutines-core</artifactId>
45+
<version>${project.version}</version>
46+
<scope>compile</scope>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.jetbrains.kotlinx</groupId>
50+
<artifactId>kotlinx-coroutines-core</artifactId>
51+
<version>${project.version}</version>
52+
<classifier>tests</classifier>
53+
<scope>test</scope>
54+
</dependency>
55+
</dependencies>
56+
57+
</project>
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.reactive
18+
19+
import kotlinx.coroutines.experimental.CancellationException
20+
import kotlinx.coroutines.experimental.Job
21+
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
22+
import org.reactivestreams.Publisher
23+
import org.reactivestreams.Subscriber
24+
import org.reactivestreams.Subscription
25+
26+
/**
27+
* Awaits for the first value from the given publisher without blocking a thread and
28+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
29+
*
30+
* This suspending function is cancellable.
31+
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
32+
* immediately resumes with [CancellationException].
33+
*/
34+
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
35+
36+
/**
37+
* Awaits for the last value from the given publisher without blocking a thread and
38+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
39+
*
40+
* This suspending function is cancellable.
41+
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
42+
* immediately resumes with [CancellationException].
43+
*/
44+
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
45+
46+
/**
47+
* Awaits for the single value from the given publisher without blocking a thread and
48+
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
49+
*
50+
* This suspending function is cancellable.
51+
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
52+
* immediately resumes with [CancellationException].
53+
*/
54+
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
55+
56+
// ------------------------ private ------------------------
57+
58+
private enum class Mode(val s: String) {
59+
FIRST("awaitFirst"),
60+
LAST("awaitLast"),
61+
SINGLE("awaitSingle");
62+
override fun toString(): String = s
63+
}
64+
65+
private suspend fun <T> Publisher<T>.awaitOne(mode: Mode): T = suspendCancellableCoroutine { cont ->
66+
subscribe(object : Subscriber<T> {
67+
private lateinit var subscription: Subscription
68+
private var value: T? = null
69+
private var seenValue = false
70+
71+
override fun onSubscribe(sub: Subscription) {
72+
subscription = sub
73+
cont.invokeOnCompletion { sub.cancel() }
74+
sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
75+
}
76+
77+
override fun onNext(t: T) {
78+
when (mode) {
79+
Mode.FIRST -> {
80+
seenValue = true
81+
cont.resume(t)
82+
subscription.cancel()
83+
}
84+
Mode.LAST, Mode.SINGLE -> {
85+
if (mode == Mode.SINGLE && seenValue) {
86+
if (cont.isActive)
87+
cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
88+
subscription.cancel()
89+
} else {
90+
value = t
91+
seenValue = true
92+
}
93+
}
94+
}
95+
}
96+
97+
override fun onComplete() {
98+
if (!seenValue) {
99+
if (cont.isActive)
100+
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
101+
return
102+
}
103+
if (!cont.isActive) return // was already resumed
104+
cont.resume(value as T)
105+
}
106+
107+
override fun onError(e: Throwable) {
108+
cont.resumeWithException(e)
109+
}
110+
})
111+
}
112+
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.reactive
18+
19+
import kotlinx.coroutines.experimental.channels.LinkedListChannel
20+
import kotlinx.coroutines.experimental.channels.ReceiveChannel
21+
import org.reactivestreams.Publisher
22+
import org.reactivestreams.Subscriber
23+
import org.reactivestreams.Subscription
24+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
25+
26+
/**
27+
* Return type for [Publisher.open] that can be used to [receive] elements from the
28+
* subscription and to manually [close] it.
29+
*/
30+
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T> {
31+
/**
32+
* Closes this subscription channel.
33+
*/
34+
public fun close()
35+
}
36+
37+
/**
38+
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
39+
*/
40+
public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> {
41+
val channel = SubscriptionChannel<T>()
42+
subscribe(channel)
43+
return channel
44+
}
45+
46+
/**
47+
* Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it.
48+
*
49+
* This is a shortcut for `open().iterator()`. See [open] if you need an ability to manually
50+
* unsubscribe from the observable.
51+
*/
52+
public operator fun <T> Publisher<T>.iterator() = open().iterator()
53+
54+
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Subscriber<T> {
55+
@Volatile
56+
@JvmField
57+
var subscription: Subscription? = null
58+
59+
@Volatile
60+
@JvmField
61+
// request balance from cancelled receivers, balance is negative if we have receivers, but no subscription yet
62+
var balance = 0
63+
64+
private companion object {
65+
@JvmStatic
66+
val BALANCE: AtomicIntegerFieldUpdater<SubscriptionChannel<*>> =
67+
AtomicIntegerFieldUpdater.newUpdater(SubscriptionChannel::class.java, "balance")
68+
}
69+
70+
// AbstractChannel overrides
71+
override fun onEnqueuedReceive() {
72+
loop@ while (true) { // lock-free loop on balance
73+
val balance = this.balance
74+
val subscription = this.subscription
75+
if (subscription != null) {
76+
if (balance < 0) { // receivers came before we had subscription
77+
// try to fixup by making request
78+
if (!BALANCE.compareAndSet(this, balance, 0)) continue@loop
79+
subscription.request(-balance.toLong())
80+
return
81+
}
82+
if (balance == 0) { // normal story
83+
subscription.request(1)
84+
return
85+
}
86+
}
87+
if (BALANCE.compareAndSet(this, balance, balance - 1)) return
88+
}
89+
}
90+
91+
override fun onCancelledReceive() {
92+
BALANCE.incrementAndGet(this)
93+
}
94+
95+
override fun afterClose(cause: Throwable?) {
96+
subscription?.cancel()
97+
}
98+
99+
// Subscription overrides
100+
override fun close() {
101+
close(cause = null)
102+
}
103+
104+
// Subscriber overrides
105+
override fun onSubscribe(s: Subscription) {
106+
subscription = s
107+
while (true) { // lock-free loop on balance
108+
if (isClosedForSend) {
109+
s.cancel()
110+
return
111+
}
112+
val balance = this.balance
113+
if (balance >= 0) return // ok -- normal story
114+
// otherwise, receivers came before we had subscription
115+
// try to fixup by making request
116+
if (!BALANCE.compareAndSet(this, balance, 0)) continue
117+
s.request(-balance.toLong())
118+
return
119+
}
120+
}
121+
122+
override fun onNext(t: T) {
123+
offer(t)
124+
}
125+
126+
override fun onComplete() {
127+
close(cause = null)
128+
}
129+
130+
override fun onError(e: Throwable) {
131+
close(cause = e)
132+
}
133+
}
134+

0 commit comments

Comments
 (0)