Skip to content

Commit 0e45a7e

Browse files
committed
1.x: add a source OnSubscribe which works from an array directly
1 parent ca7f862 commit 0e45a7e

File tree

4 files changed

+343
-18
lines changed

4 files changed

+343
-18
lines changed

src/main/java/rx/Observable.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,7 +1243,14 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
12431243
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
12441244
*/
12451245
public final static <T> Observable<T> from(T[] array) {
1246-
return from(Arrays.asList(array));
1246+
int n = array.length;
1247+
if (n == 0) {
1248+
return empty();
1249+
} else
1250+
if (n == 1) {
1251+
return just(array[0]);
1252+
}
1253+
return create(new OnSubscribeFromArray<T>(array));
12471254
}
12481255

12491256
/**
@@ -1423,7 +1430,7 @@ public final static <T> Observable<T> just(final T value) {
14231430
// suppress unchecked because we are using varargs inside the method
14241431
@SuppressWarnings("unchecked")
14251432
public final static <T> Observable<T> just(T t1, T t2) {
1426-
return from(Arrays.asList(t1, t2));
1433+
return from((T[])new Object[] { t1, t2 });
14271434
}
14281435

14291436
/**
@@ -1449,7 +1456,7 @@ public final static <T> Observable<T> just(T t1, T t2) {
14491456
// suppress unchecked because we are using varargs inside the method
14501457
@SuppressWarnings("unchecked")
14511458
public final static <T> Observable<T> just(T t1, T t2, T t3) {
1452-
return from(Arrays.asList(t1, t2, t3));
1459+
return from((T[])new Object[] { t1, t2, t3 });
14531460
}
14541461

14551462
/**
@@ -1477,7 +1484,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3) {
14771484
// suppress unchecked because we are using varargs inside the method
14781485
@SuppressWarnings("unchecked")
14791486
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
1480-
return from(Arrays.asList(t1, t2, t3, t4));
1487+
return from((T[])new Object[] { t1, t2, t3, t4 });
14811488
}
14821489

14831490
/**
@@ -1507,7 +1514,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
15071514
// suppress unchecked because we are using varargs inside the method
15081515
@SuppressWarnings("unchecked")
15091516
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
1510-
return from(Arrays.asList(t1, t2, t3, t4, t5));
1517+
return from((T[])new Object[] { t1, t2, t3, t4, t5 });
15111518
}
15121519

15131520
/**
@@ -1539,7 +1546,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {
15391546
// suppress unchecked because we are using varargs inside the method
15401547
@SuppressWarnings("unchecked")
15411548
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
1542-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6));
1549+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6 });
15431550
}
15441551

15451552
/**
@@ -1573,7 +1580,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6) {
15731580
// suppress unchecked because we are using varargs inside the method
15741581
@SuppressWarnings("unchecked")
15751582
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
1576-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7));
1583+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7 });
15771584
}
15781585

15791586
/**
@@ -1609,7 +1616,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
16091616
// suppress unchecked because we are using varargs inside the method
16101617
@SuppressWarnings("unchecked")
16111618
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
1612-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8));
1619+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8 });
16131620
}
16141621

16151622
/**
@@ -1647,7 +1654,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
16471654
// suppress unchecked because we are using varargs inside the method
16481655
@SuppressWarnings("unchecked")
16491656
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) {
1650-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9));
1657+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
16511658
}
16521659

16531660
/**
@@ -1687,7 +1694,7 @@ public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T
16871694
// suppress unchecked because we are using varargs inside the method
16881695
@SuppressWarnings("unchecked")
16891696
public final static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) {
1690-
return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10));
1697+
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 });
16911698
}
16921699

16931700
/**
@@ -1821,7 +1828,7 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
18211828
*/
18221829
@SuppressWarnings("unchecked")
18231830
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
1824-
return merge(from(Arrays.asList(t1, t2)));
1831+
return merge(new Observable[] { t1, t2 });
18251832
}
18261833

18271834
/**
@@ -1847,7 +1854,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
18471854
*/
18481855
@SuppressWarnings("unchecked")
18491856
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
1850-
return merge(from(Arrays.asList(t1, t2, t3)));
1857+
return merge(new Observable[] { t1, t2, t3 });
18511858
}
18521859

18531860
/**
@@ -1875,7 +1882,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
18751882
*/
18761883
@SuppressWarnings("unchecked")
18771884
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
1878-
return merge(from(Arrays.asList(t1, t2, t3, t4)));
1885+
return merge(new Observable[] { t1, t2, t3, t4 });
18791886
}
18801887

18811888
/**
@@ -1905,7 +1912,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19051912
*/
19061913
@SuppressWarnings("unchecked")
19071914
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
1908-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5)));
1915+
return merge(new Observable[] { t1, t2, t3, t4, t5 });
19091916
}
19101917

19111918
/**
@@ -1937,7 +1944,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19371944
*/
19381945
@SuppressWarnings("unchecked")
19391946
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
1940-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6)));
1947+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6 });
19411948
}
19421949

19431950
/**
@@ -1971,7 +1978,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19711978
*/
19721979
@SuppressWarnings("unchecked")
19731980
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
1974-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)));
1981+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7 });
19751982
}
19761983

19771984
/**
@@ -2007,7 +2014,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
20072014
*/
20082015
@SuppressWarnings("unchecked")
20092016
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
2010-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)));
2017+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8 });
20112018
}
20122019

20132020
/**
@@ -2045,7 +2052,7 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
20452052
*/
20462053
@SuppressWarnings("unchecked")
20472054
public final static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
2048-
return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)));
2055+
return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 });
20492056
}
20502057

20512058
/**
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import rx.*;
22+
import rx.Observable.OnSubscribe;
23+
24+
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
25+
final T[] array;
26+
public OnSubscribeFromArray(T[] array) {
27+
this.array = array;
28+
}
29+
30+
@Override
31+
public void call(Subscriber<? super T> child) {
32+
child.setProducer(new FromArrayProducer<T>(child, array));
33+
}
34+
35+
static final class FromArrayProducer<T>
36+
extends AtomicLong
37+
implements Producer {
38+
/** */
39+
private static final long serialVersionUID = 3534218984725836979L;
40+
41+
final Subscriber<? super T> child;
42+
final T[] array;
43+
44+
int index;
45+
46+
public FromArrayProducer(Subscriber<? super T> child, T[] array) {
47+
this.child = child;
48+
this.array = array;
49+
}
50+
51+
@Override
52+
public void request(long n) {
53+
if (n < 0) {
54+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
55+
}
56+
if (n == Long.MAX_VALUE) {
57+
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
58+
fastPath();
59+
}
60+
} else
61+
if (n != 0) {
62+
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
63+
slowPath(n);
64+
}
65+
}
66+
}
67+
68+
void fastPath() {
69+
final Subscriber<? super T> child = this.child;
70+
71+
for (T t : array) {
72+
if (child.isUnsubscribed()) {
73+
return;
74+
}
75+
76+
child.onNext(t);
77+
}
78+
79+
if (child.isUnsubscribed()) {
80+
return;
81+
}
82+
child.onCompleted();
83+
}
84+
85+
void slowPath(long r) {
86+
final Subscriber<? super T> child = this.child;
87+
final T[] array = this.array;
88+
final int n = array.length;
89+
90+
long e = 0L;
91+
int i = index;
92+
93+
for (;;) {
94+
95+
while (r != 0L && i != n) {
96+
if (child.isUnsubscribed()) {
97+
return;
98+
}
99+
100+
child.onNext(array[i]);
101+
102+
i++;
103+
104+
if (i == n) {
105+
if (!child.isUnsubscribed()) {
106+
child.onCompleted();
107+
}
108+
return;
109+
}
110+
111+
r--;
112+
e--;
113+
}
114+
115+
r = get() + e;
116+
117+
if (r == 0L) {
118+
index = i;
119+
r = addAndGet(e);
120+
if (r == 0L) {
121+
return;
122+
}
123+
e = 0L;
124+
}
125+
}
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)