Skip to content

Commit 8d3a0c5

Browse files
committed
Merge pull request #3477 from akarnokd/FromArray1x
1.x: add a source OnSubscribe which works from an array directly
2 parents 5e3540f + 0e45a7e commit 8d3a0c5

File tree

4 files changed

+343
-18
lines changed

4 files changed

+343
-18
lines changed

src/main/java/rx/Observable.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,7 +1268,14 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
12681268
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
12691269
*/
12701270
public final static <T> Observable<T> from(T[] array) {
1271-
return from(Arrays.asList(array));
1271+
int n = array.length;
1272+
if (n == 0) {
1273+
return empty();
1274+
} else
1275+
if (n == 1) {
1276+
return just(array[0]);
1277+
}
1278+
return create(new OnSubscribeFromArray<T>(array));
12721279
}
12731280

12741281
/**
@@ -1448,7 +1455,7 @@ public final static <T> Observable<T> just(final T value) {
14481455
// suppress unchecked because we are using varargs inside the method
14491456
@SuppressWarnings("unchecked")
14501457
public final static <T> Observable<T> just(T t1, T t2) {
1451-
return from(Arrays.asList(t1, t2));
1458+
return from((T[])new Object[] { t1, t2 });
14521459
}
14531460

14541461
/**
@@ -1474,7 +1481,7 @@ public final static <T> Observable<T> just(T t1, T t2) {
14741481
// suppress unchecked because we are using varargs inside the method
14751482
@SuppressWarnings("unchecked")
14761483
public final static <T> Observable<T> just(T t1, T t2, T t3) {
1477-
return from(Arrays.asList(t1, t2, t3));
1484+
return from((T[])new Object[] { t1, t2, t3 });
14781485
}
14791486

14801487
/**
@@ -1502,7 +1509,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3) {
15021509
// suppress unchecked because we are using varargs inside the method
15031510
@SuppressWarnings("unchecked")
15041511
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
1505-
return from(Arrays.asList(t1, t2, t3, t4));
1512+
return from((T[])new Object[] { t1, t2, t3, t4 });
15061513
}
15071514

15081515
/**
@@ -1532,7 +1539,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
15321539
// suppress unchecked because we are using varargs inside the method
15331540
@SuppressWarnings("unchecked")
15341541
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
1535-
return from(Arrays.asList(t1, t2, t3, t4, t5));
1542+
return from((T[])new Object[] { t1, t2, t3, t4, t5 });
15361543
}
15371544

15381545
/**
@@ -1564,7 +1571,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
15641571
// suppress unchecked because we are using varargs inside the method
15651572
@SuppressWarnings("unchecked")
15661573
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
1567-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6));
1574+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6 });
15681575
}
15691576

15701577
/**
@@ -1598,7 +1605,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
15981605
// suppress unchecked because we are using varargs inside the method
15991606
@SuppressWarnings("unchecked")
16001607
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
1601-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7));
1608+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7 });
16021609
}
16031610

16041611
/**
@@ -1634,7 +1641,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
16341641
// suppress unchecked because we are using varargs inside the method
16351642
@SuppressWarnings("unchecked")
16361643
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
1637-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8));
1644+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8 });
16381645
}
16391646

16401647
/**
@@ -1672,7 +1679,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
16721679
// suppress unchecked because we are using varargs inside the method
16731680
@SuppressWarnings("unchecked")
16741681
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) {
1675-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9));
1682+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
16761683
}
16771684

16781685
/**
@@ -1712,7 +1719,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
17121719
// suppress unchecked because we are using varargs inside the method
17131720
@SuppressWarnings("unchecked")
17141721
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) {
1715-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10));
1722+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 });
17161723
}
17171724

17181725
/**
@@ -1845,7 +1852,7 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
18451852
*/
18461853
@SuppressWarnings("unchecked")
18471854
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
1848-
return merge(from(Arrays.asList(t1, t2)));
1855+
return merge(new Observable[] { t1, t2 });
18491856
}
18501857

18511858
/**
@@ -1871,7 +1878,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
18711878
*/
18721879
@SuppressWarnings("unchecked")
18731880
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
1874-
return merge(from(Arrays.asList(t1, t2, t3)));
1881+
return merge(new Observable[] { t1, t2, t3 });
18751882
}
18761883

18771884
/**
@@ -1899,7 +1906,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
18991906
*/
19001907
@SuppressWarnings("unchecked")
19011908
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
1902-
return merge(from(Arrays.asList(t1, t2, t3, t4)));
1909+
return merge(new Observable[] { t1, t2, t3, t4 });
19031910
}
19041911

19051912
/**
@@ -1929,7 +1936,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19291936
*/
19301937
@SuppressWarnings("unchecked")
19311938
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
1932-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5)));
1939+
return merge(new Observable[] { t1, t2, t3, t4, t5 });
19331940
}
19341941

19351942
/**
@@ -1961,7 +1968,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19611968
*/
19621969
@SuppressWarnings("unchecked")
19631970
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
1964-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6)));
1971+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6 });
19651972
}
19661973

19671974
/**
@@ -1995,7 +2002,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19952002
*/
19962003
@SuppressWarnings("unchecked")
19972004
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
1998-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)));
2005+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7 });
19992006
}
20002007

20012008
/**
@@ -2031,7 +2038,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
20312038
*/
20322039
@SuppressWarnings("unchecked")
20332040
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
2034-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)));
2041+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8 });
20352042
}
20362043

20372044
/**
@@ -2069,7 +2076,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
20692076
*/
20702077
@SuppressWarnings("unchecked")
20712078
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
2072-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)));
2079+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
20732080
}
20742081

20752082
/**
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import rx.*;
22+
import rx.Observable.OnSubscribe;
23+
24+
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
25+
final T[] array;
26+
public OnSubscribeFromArray(T[] array) {
27+
this.array = array;
28+
}
29+
30+
@Override
31+
public void call(Subscriber<? super T> child) {
32+
child.setProducer(new FromArrayProducer<T>(child, array));
33+
}
34+
35+
static final class FromArrayProducer<T>
36+
extends AtomicLong
37+
implements Producer {
38+
/** */
39+
private static final long serialVersionUID = 3534218984725836979L;
40+
41+
final Subscriber<? super T> child;
42+
final T[] array;
43+
44+
int index;
45+
46+
public FromArrayProducer(Subscriber<? super T> child, T[] array) {
47+
this.child = child;
48+
this.array = array;
49+
}
50+
51+
@Override
52+
public void request(long n) {
53+
if (n < 0) {
54+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
55+
}
56+
if (n == Long.MAX_VALUE) {
57+
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
58+
fastPath();
59+
}
60+
} else
61+
if (n != 0) {
62+
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
63+
slowPath(n);
64+
}
65+
}
66+
}
67+
68+
void fastPath() {
69+
final Subscriber<? super T> child = this.child;
70+
71+
for (T t : array) {
72+
if (child.isUnsubscribed()) {
73+
return;
74+
}
75+
76+
child.onNext(t);
77+
}
78+
79+
if (child.isUnsubscribed()) {
80+
return;
81+
}
82+
child.onCompleted();
83+
}
84+
85+
void slowPath(long r) {
86+
final Subscriber<? super T> child = this.child;
87+
final T[] array = this.array;
88+
final int n = array.length;
89+
90+
long e = 0L;
91+
int i = index;
92+
93+
for (;;) {
94+
95+
while (r != 0L && i != n) {
96+
if (child.isUnsubscribed()) {
97+
return;
98+
}
99+
100+
child.onNext(array[i]);
101+
102+
i++;
103+
104+
if (i == n) {
105+
if (!child.isUnsubscribed()) {
106+
child.onCompleted();
107+
}
108+
return;
109+
}
110+
111+
r--;
112+
e--;
113+
}
114+
115+
r = get() + e;
116+
117+
if (r == 0L) {
118+
index = i;
119+
r = addAndGet(e);
120+
if (r == 0L) {
121+
return;
122+
}
123+
e = 0L;
124+
}
125+
}
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)