Skip to content

Commit 52022dd

Browse files
Merge pull request #787 from MarioAriasC/master
Fix problem with Observable.create()
2 parents 8ed1b14 + 4f69ce0 commit 52022dd

File tree

5 files changed

+113
-90
lines changed

5 files changed

+113
-90
lines changed

language-adaptors/rxjava-kotlin/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
Kotlin has support for SAM (Single Abstract Method) Interfaces as Functions (i.e. Java 8 Lambdas). So you could use Kotlin in RxJava whitout this adaptor
44

55
```kotlin
6-
Observable.create<String>{ observer ->
7-
observer!!.onNext("Hello")
8-
observer.onCompleted()
6+
Observable.create(OnSubscribeFunc<String> {
7+
it!!.onNext("Hello")
8+
it.onCompleted()
99
Subscriptions.empty()
10-
}!!.subscribe { result ->
10+
})!!.subscribe { result ->
1111
a!!.received(result)
1212
}
1313
```
@@ -21,7 +21,7 @@ import rx.lang.kotlin.*
2121
observer.onNext("Hello")
2222
observer.onCompleted()
2323
Subscriptions.empty()!!
24-
}.asObservable().subscribe { result ->
24+
}.asObservableFunc().subscribe { result ->
2525
a!!.received(result)
2626
}
2727
```

language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,23 @@
1616

1717
package rx.lang.kotlin
1818

19-
import rx.Subscription
2019
import rx.Observer
2120
import rx.Observable
21+
import rx.Observable.OnSubscribe
22+
import rx.Subscription
23+
import rx.Observable.OnSubscribeFunc
24+
25+
26+
public fun<T> Function1<Observer<in T>, Unit>.asObservable(): Observable<T> {
27+
return Observable.create(OnSubscribe<T>{ t1 ->
28+
this(t1!!)
29+
})!!
30+
}
2231

23-
public fun<T> Function1<Observer<in T>, Subscription>.asObservable(): Observable<T> {
24-
return Observable.create { this(it!!) }!!
32+
public fun<T> Function1<Observer<in T>, Subscription>.asObservableFunc(): Observable<T> {
33+
return Observable.create(OnSubscribeFunc<T>{ op ->
34+
this(op!!)
35+
})!!
2536
}
2637

2738
public fun<T> Function0<Observable<out T>>.defer(): Observable<T> {
@@ -41,11 +52,11 @@ public fun<T> Throwable.asObservable(): Observable<T> {
4152
}
4253

4354
public fun<T> Pair<T, T>.asObservable(): Observable<T> {
44-
return Observable.from(this.component1(), this.component2())!!
55+
return Observable.from(listOf(this.component1(), this.component2()))!!
4556
}
4657

4758
public fun<T> Triple<T, T, T>.asObservable(): Observable<T> {
48-
return Observable.from(this.component1(), this.component2(), this.component3())!!
59+
return Observable.from(listOf(this.component1(), this.component2(), this.component3()))!!
4960
}
5061

5162
public fun<T> Pair<Observable<T>, Observable<T>>.merge(): Observable<T> {

language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt

Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -35,27 +35,18 @@ import rx.lang.kotlin.BasicKotlinTests.AsyncObservable
3535
/**
3636
* This class use plain Kotlin without extensions from the language adaptor
3737
*/
38-
public class BasicKotlinTests {
38+
public class BasicKotlinTests:KotlinTests() {
3939

40-
[Mock] var a: ScriptAssertion? = null
41-
[Mock] var w: Observable<Int>? = null
4240

43-
[Before]
44-
public fun before() {
45-
MockitoAnnotations.initMocks(this)
46-
}
47-
48-
fun received<T>(): (T?) -> Unit {
49-
return {(result: T?) -> a!!.received(result) }
50-
}
5141

5242
[Test]
5343
public fun testCreate() {
54-
Observable.create<String>{
44+
45+
Observable.create(OnSubscribeFunc<String> {
5546
it!!.onNext("Hello")
5647
it.onCompleted()
5748
Subscriptions.empty()
58-
}!!.subscribe { result ->
49+
})!!.subscribe { result ->
5950
a!!.received(result)
6051
}
6152

@@ -64,20 +55,20 @@ public class BasicKotlinTests {
6455

6556
[Test]
6657
public fun testFilter() {
67-
Observable.from(1, 2, 3)!!.filter { it!! >= 2 }!!.subscribe(received())
58+
Observable.from(listOf(1, 2, 3))!!.filter { it!! >= 2 }!!.subscribe(received())
6859
verify(a, times(0))!!.received(1);
6960
verify(a, times(1))!!.received(2);
7061
verify(a, times(1))!!.received(3);
7162
}
7263

7364
[Test]
7465
public fun testLast() {
75-
assertEquals("three", Observable.from("one", "two", "three")!!.toBlockingObservable()!!.last())
66+
assertEquals("three", Observable.from(listOf("one", "two", "three"))!!.toBlockingObservable()!!.last())
7667
}
7768

7869
[Test]
7970
public fun testLastWithPredicate() {
80-
assertEquals("two", Observable.from("one", "two", "three")!!.toBlockingObservable()!!.last { x -> x!!.length == 3 })
71+
assertEquals("two", Observable.from(listOf("one", "two", "three"))!!.toBlockingObservable()!!.last { x -> x!!.length == 3 })
8172
}
8273

8374
[Test]
@@ -88,29 +79,29 @@ public class BasicKotlinTests {
8879

8980
[Test]
9081
public fun testMap2() {
91-
Observable.from(1, 2, 3)!!.map { v -> "hello_$v" }!!.subscribe((received()))
82+
Observable.from(listOf(1, 2, 3))!!.map { v -> "hello_$v" }!!.subscribe((received()))
9283
verify(a, times(1))!!.received("hello_1")
9384
verify(a, times(1))!!.received("hello_2")
9485
verify(a, times(1))!!.received("hello_3")
9586
}
9687

9788
[Test]
9889
public fun testMaterialize() {
99-
Observable.from(1, 2, 3)!!.materialize()!!.subscribe((received()))
90+
Observable.from(listOf(1, 2, 3))!!.materialize()!!.subscribe((received()))
10091
verify(a, times(4))!!.received(any(javaClass<Notification<Int>>()))
10192
verify(a, times(0))!!.error(any(javaClass<Exception>()))
10293
}
10394

10495
[Test]
10596
public fun testMergeDelayError() {
10697
Observable.mergeDelayError(
107-
Observable.from(1, 2, 3),
98+
Observable.from(listOf(1, 2, 3)),
10899
Observable.merge(
109100
Observable.from(6),
110101
Observable.error(NullPointerException()),
111102
Observable.from(7)
112103
),
113-
Observable.from(4, 5)
104+
Observable.from(listOf(4, 5))
114105
)!!.subscribe(received(), { e -> a!!.error(e) })
115106
verify(a, times(1))!!.received(1)
116107
verify(a, times(1))!!.received(2)
@@ -125,13 +116,13 @@ public class BasicKotlinTests {
125116
[Test]
126117
public fun testMerge() {
127118
Observable.merge(
128-
Observable.from(1, 2, 3),
119+
Observable.from(listOf(1, 2, 3)),
129120
Observable.merge(
130121
Observable.from(6),
131122
Observable.error(NullPointerException()),
132123
Observable.from(7)
133124
),
134-
Observable.from(4, 5)
125+
Observable.from(listOf(4, 5))
135126
)!!.subscribe(received(), { e -> a!!.error(e) })
136127
verify(a, times(1))!!.received(1)
137128
verify(a, times(1))!!.received(2)
@@ -166,7 +157,7 @@ public class BasicKotlinTests {
166157
[Test]
167158
public fun testFromWithObjects() {
168159
val list = listOf(1, 2, 3, 4, 5)
169-
assertEquals(2, Observable.from(list, 6)!!.count()!!.toBlockingObservable()!!.single())
160+
assertEquals(2, Observable.from(listOf(list, 6))!!.count()!!.toBlockingObservable()!!.single())
170161
}
171162

172163
[Test]
@@ -185,23 +176,23 @@ public class BasicKotlinTests {
185176

186177
[Test]
187178
public fun testSkipTake() {
188-
Observable.from(1, 2, 3)!!.skip(1)!!.take(1)!!.subscribe(received())
179+
Observable.from(listOf(1, 2, 3))!!.skip(1)!!.take(1)!!.subscribe(received())
189180
verify(a, times(0))!!.received(1)
190181
verify(a, times(1))!!.received(2)
191182
verify(a, times(0))!!.received(3)
192183
}
193184

194185
[Test]
195186
public fun testSkip() {
196-
Observable.from(1, 2, 3)!!.skip(2)!!.subscribe(received())
187+
Observable.from(listOf(1, 2, 3))!!.skip(2)!!.subscribe(received())
197188
verify(a, times(0))!!.received(1)
198189
verify(a, times(0))!!.received(2)
199190
verify(a, times(1))!!.received(3)
200191
}
201192

202193
[Test]
203194
public fun testTake() {
204-
Observable.from(1, 2, 3)!!.take(2)!!.subscribe(received())
195+
Observable.from(listOf(1, 2, 3))!!.take(2)!!.subscribe(received())
205196
verify(a, times(1))!!.received(1)
206197
verify(a, times(1))!!.received(2)
207198
verify(a, times(0))!!.received(3)
@@ -215,15 +206,15 @@ public class BasicKotlinTests {
215206

216207
[Test]
217208
public fun testTakeWhile() {
218-
Observable.from(1, 2, 3)!!.takeWhile { x -> x!! < 3 }!!.subscribe(received())
209+
Observable.from(listOf(1, 2, 3))!!.takeWhile { x -> x!! < 3 }!!.subscribe(received())
219210
verify(a, times(1))!!.received(1)
220211
verify(a, times(1))!!.received(2)
221212
verify(a, times(0))!!.received(3)
222213
}
223214

224215
[Test]
225216
public fun testTakeWhileWithIndex() {
226-
Observable.from(1, 2, 3)!!.takeWhileWithIndex { x, i -> i!! < 2 }!!.subscribe(received())
217+
Observable.from(listOf(1, 2, 3))!!.takeWhileWithIndex { x, i -> i!! < 2 }!!.subscribe(received())
227218
verify(a, times(1))!!.received(1)
228219
verify(a, times(1))!!.received(2)
229220
verify(a, times(0))!!.received(3)
@@ -251,35 +242,35 @@ public class BasicKotlinTests {
251242

252243
[Test]
253244
public fun testLastOrDefault() {
254-
assertEquals("two", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length == 3 })
255-
assertEquals("default", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 })
245+
assertEquals("two", Observable.from(listOf("one", "two"))!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length == 3 })
246+
assertEquals("default", Observable.from(listOf("one", "two"))!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 })
256247
}
257248

258249
[Test(expected = javaClass<IllegalArgumentException>())]
259250
public fun testSingle() {
260251
assertEquals("one", Observable.from("one")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 })
261-
Observable.from("one", "two")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }
252+
Observable.from(listOf("one", "two"))!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }
262253
fail()
263254
}
264255

265256
[Test]
266257
public fun testDefer() {
267-
Observable.defer { Observable.from(1, 2) }!!.subscribe(received())
258+
Observable.defer { Observable.from(listOf(1, 2)) }!!.subscribe(received())
268259
verify(a, times(1))!!.received(1)
269260
verify(a, times(1))!!.received(2)
270261
}
271262

272263
[Test]
273264
public fun testAll() {
274-
Observable.from(1, 2, 3)!!.all { x -> x!! > 0 }!!.subscribe(received())
265+
Observable.from(listOf(1, 2, 3))!!.all { x -> x!! > 0 }!!.subscribe(received())
275266
verify(a, times(1))!!.received(true)
276267
}
277268

278269
[Test]
279270
public fun testZip() {
280-
val o1 = Observable.from(1, 2, 3)!!
281-
val o2 = Observable.from(4, 5, 6)!!
282-
val o3 = Observable.from(7, 8, 9)!!
271+
val o1 = Observable.from(listOf(1, 2, 3))!!
272+
val o2 = Observable.from(listOf(4, 5, 6))!!
273+
val o3 = Observable.from(listOf(7, 8, 9))!!
283274

284275
val values = Observable.zip(o1, o2, o3) { a, b, c -> listOf(a, b, c) }!!.toList()!!.toBlockingObservable()!!.single()!!
285276
assertEquals(listOf(1, 4, 7), values[0])
@@ -289,9 +280,9 @@ public class BasicKotlinTests {
289280

290281
[Test]
291282
public fun testZipWithIterable() {
292-
val o1 = Observable.from(1, 2, 3)!!
293-
val o2 = Observable.from(4, 5, 6)!!
294-
val o3 = Observable.from(7, 8, 9)!!
283+
val o1 = Observable.from(listOf(1, 2, 3))!!
284+
val o2 = Observable.from(listOf(4, 5, 6))!!
285+
val o3 = Observable.from(listOf(7, 8, 9))!!
295286

296287
val values = Observable.zip(listOf(o1, o2, o3)) { args -> listOf(*args) }!!.toList()!!.toBlockingObservable()!!.single()!!
297288
assertEquals(listOf(1, 4, 7), values[0])
@@ -303,33 +294,28 @@ public class BasicKotlinTests {
303294
public fun testGroupBy() {
304295
var count = 0
305296

306-
Observable.from("one", "two", "three", "four", "five", "six")!!
297+
Observable.from(listOf("one", "two", "three", "four", "five", "six"))!!
307298
.groupBy { s -> s!!.length }!!
308-
.mapMany { groupObervable ->
299+
.flatMap { groupObervable ->
309300
groupObervable!!.map { s ->
310301
"Value: $s Group ${groupObervable.getKey()}"
311302
}
312-
}!!
313-
.toBlockingObservable()!!.forEach { s ->
303+
}!!.toBlockingObservable()!!.forEach { s ->
314304
println(s)
315305
count++
316306
}
317307

318308
assertEquals(6, count)
319309
}
320310

321-
public trait ScriptAssertion{
322-
fun error(e: Throwable?)
323311

324-
fun received(e: Any?)
325-
}
326312

327313
public class TestFactory(){
328314
var counter = 1
329315

330316
val numbers: Observable<Int>
331317
get(){
332-
return Observable.from(1, 3, 2, 5, 4)!!
318+
return Observable.from(listOf(1, 3, 2, 5, 4))!!
333319
}
334320

335321
val onSubscribe: TestOnSubscribe
@@ -345,22 +331,22 @@ public class BasicKotlinTests {
345331
}
346332

347333
class AsyncObservable : OnSubscribeFunc<Int>{
348-
override fun onSubscribe(t1: Observer<in Int>?): Subscription? {
334+
override fun onSubscribe(op: Observer<in Int>?): Subscription? {
349335
thread {
350336
Thread.sleep(50)
351-
t1!!.onNext(1)
352-
t1.onNext(2)
353-
t1.onNext(3)
354-
t1.onCompleted()
337+
op!!.onNext(1)
338+
op.onNext(2)
339+
op.onNext(3)
340+
op.onCompleted()
355341
}
356342
return Subscriptions.empty()
357343
}
358344
}
359345

360346
class TestOnSubscribe(val count: Int) : OnSubscribeFunc<String>{
361-
override fun onSubscribe(t1: Observer<in String>?): Subscription? {
362-
t1!!.onNext("hello_$count")
363-
t1.onCompleted()
347+
override fun onSubscribe(op: Observer<in String>?): Subscription? {
348+
op!!.onNext("hello_$count")
349+
op.onCompleted()
364350
return Subscription { }
365351
}
366352

0 commit comments

Comments
 (0)