Skip to content

Commit 4154c0f

Browse files
OnSubscribeRange
1 parent 867df14 commit 4154c0f

File tree

6 files changed

+152
-143
lines changed

6 files changed

+152
-143
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import rx.observables.ConnectableObservable;
3434
import rx.observables.GroupedObservable;
3535
import rx.observers.SafeSubscriber;
36+
import rx.operators.OnSubscribeRange;
3637
import rx.operators.OperationAll;
3738
import rx.operators.OperationAmb;
3839
import rx.operators.OperationAny;
@@ -120,7 +121,6 @@
120121
import rx.subscriptions.Subscriptions;
121122
import rx.util.Exceptions;
122123
import rx.util.OnErrorNotImplementedException;
123-
import rx.util.Range;
124124
import rx.util.TimeInterval;
125125
import rx.util.Timestamped;
126126
import rx.util.functions.Action0;
@@ -2439,7 +2439,10 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
24392439
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229460.aspx">MSDN: Observable.Range</a>
24402440
*/
24412441
public final static Observable<Integer> range(int start, int count) {
2442-
return from(Range.createWithCount(start, count));
2442+
if ((start + count) > Integer.MAX_VALUE) {
2443+
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
2444+
}
2445+
return Observable.create(new OnSubscribeRange(start, start + count));
24432446
}
24442447

24452448
/**
@@ -2459,7 +2462,10 @@ public final static Observable<Integer> range(int start, int count) {
24592462
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">MSDN: Observable.Range</a>
24602463
*/
24612464
public final static Observable<Integer> range(int start, int count, Scheduler scheduler) {
2462-
return from(Range.createWithCount(start, count), scheduler);
2465+
if ((start + count) > Integer.MAX_VALUE) {
2466+
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
2467+
}
2468+
return Observable.create(new OnSubscribeRange(start, start + count)).subscribeOn(scheduler);
24632469
}
24642470

24652471
/**
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.OnSubscribe;
19+
import rx.Subscriber;
20+
21+
/**
22+
*/
23+
public final class OnSubscribeRange implements OnSubscribe<Integer> {
24+
25+
private final int start;
26+
private final int end;
27+
28+
public OnSubscribeRange(int start, int end) {
29+
this.start = start;
30+
this.end = end;
31+
}
32+
33+
@Override
34+
public void call(Subscriber<? super Integer> o) {
35+
for (int i = start; i < end; i++) {
36+
if (o.isUnsubscribed()) {
37+
return;
38+
}
39+
o.onNext(i);
40+
}
41+
o.onCompleted();
42+
}
43+
44+
}

rxjava-core/src/main/java/rx/util/Range.java

Lines changed: 0 additions & 75 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.perf.AbstractPerformanceTester;
5+
import rx.perf.IntegerSumObserver;
6+
import rx.util.functions.Action0;
7+
8+
public class OperatorRangePerformance extends AbstractPerformanceTester {
9+
10+
static int reps = Integer.MAX_VALUE / 8;
11+
12+
OperatorRangePerformance() {
13+
super(reps);
14+
}
15+
16+
public static void main(String args[]) {
17+
18+
final OperatorRangePerformance spt = new OperatorRangePerformance();
19+
try {
20+
spt.runTest(new Action0() {
21+
22+
@Override
23+
public void call() {
24+
spt.timeRange();
25+
}
26+
});
27+
} catch (Exception e) {
28+
e.printStackTrace();
29+
}
30+
31+
}
32+
33+
/**
34+
*
35+
* -- 0.17
36+
*
37+
* Run: 10 - 271,147,198 ops/sec
38+
* Run: 11 - 274,821,481 ops/sec
39+
* Run: 12 - 271,632,295 ops/sec
40+
* Run: 13 - 277,876,014 ops/sec
41+
* Run: 14 - 274,821,763 ops/sec
42+
*
43+
* -- 0.16.1
44+
*
45+
* Run: 10 - 222,104,280 ops/sec
46+
* Run: 11 - 224,311,761 ops/sec
47+
* Run: 12 - 222,999,339 ops/sec
48+
* Run: 13 - 222,344,174 ops/sec
49+
* Run: 14 - 225,247,983 ops/sec
50+
*
51+
* @return
52+
*/
53+
public long timeRange() {
54+
IntegerSumObserver o = new IntegerSumObserver();
55+
Observable.range(1, reps).subscribe(o);
56+
return o.sum;
57+
}
58+
59+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Mockito.*;
19+
20+
import org.junit.Test;
21+
22+
import rx.Observable;
23+
import rx.Observer;
24+
25+
public class OnSubscribeRangeTest {
26+
27+
@Test
28+
public void testRangeStartAt2Count3() {
29+
@SuppressWarnings("unchecked")
30+
Observer<Integer> observer = mock(Observer.class);
31+
Observable.range(2, 3).subscribe(observer);
32+
33+
verify(observer, times(1)).onNext(2);
34+
verify(observer, times(1)).onNext(3);
35+
verify(observer, times(1)).onNext(4);
36+
verify(observer, never()).onNext(5);
37+
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
38+
verify(observer, times(1)).onCompleted();
39+
}
40+
}

rxjava-core/src/test/java/rx/util/RangeTest.java

Lines changed: 0 additions & 65 deletions
This file was deleted.

0 commit comments

Comments
 (0)