Skip to content

Commit c36cd66

Browse files
Test Unsubscribe
- also cleaned up some stuff I remembered after merging last commits
1 parent 185a575 commit c36cd66

File tree

3 files changed

+34
-6
lines changed

3 files changed

+34
-6
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2439,10 +2439,13 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
24392439
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229460.aspx">MSDN: Observable.Range</a>
24402440
*/
24412441
public final static Observable<Integer> range(int start, int count) {
2442+
if (count < 1) {
2443+
throw new IllegalArgumentException("Count must be positive");
2444+
}
24422445
if ((start + count) > Integer.MAX_VALUE) {
24432446
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
24442447
}
2445-
return Observable.create(new OnSubscribeRange(start, start + count));
2448+
return Observable.create(new OnSubscribeRange(start, start + (count - 1)));
24462449
}
24472450

24482451
/**
@@ -2462,10 +2465,7 @@ public final static Observable<Integer> range(int start, int count) {
24622465
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">MSDN: Observable.Range</a>
24632466
*/
24642467
public final static Observable<Integer> range(int start, int count, Scheduler scheduler) {
2465-
if ((start + count) > Integer.MAX_VALUE) {
2466-
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
2467-
}
2468-
return Observable.create(new OnSubscribeRange(start, start + count)).subscribeOn(scheduler);
2468+
return range(start, count).subscribeOn(scheduler);
24692469
}
24702470

24712471
/**

rxjava-core/src/main/java/rx/operators/OnSubscribeRange.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Subscriber;
2020

2121
/**
22+
* Emit ints from start to end inclusive.
2223
*/
2324
public final class OnSubscribeRange implements OnSubscribe<Integer> {
2425

@@ -32,7 +33,7 @@ public OnSubscribeRange(int start, int end) {
3233

3334
@Override
3435
public void call(Subscriber<? super Integer> o) {
35-
for (int i = start; i < end; i++) {
36+
for (int i = start; i <= end; i++) {
3637
if (o.isUnsubscribed()) {
3738
return;
3839
}

rxjava-core/src/test/java/rx/operators/OnSubscribeRangeTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.*;
1819
import static org.mockito.Mockito.*;
1920

21+
import java.util.concurrent.atomic.AtomicInteger;
22+
2023
import org.junit.Test;
2124

2225
import rx.Observable;
2326
import rx.Observer;
27+
import rx.util.functions.Action1;
2428

2529
public class OnSubscribeRangeTest {
2630

@@ -37,4 +41,27 @@ public void testRangeStartAt2Count3() {
3741
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
3842
verify(observer, times(1)).onCompleted();
3943
}
44+
45+
@Test
46+
public void testRangeUnsubscribe() {
47+
@SuppressWarnings("unchecked")
48+
Observer<Integer> observer = mock(Observer.class);
49+
final AtomicInteger count = new AtomicInteger();
50+
Observable.range(1, 1000).doOnNext(new Action1<Integer>() {
51+
52+
@Override
53+
public void call(Integer t1) {
54+
count.incrementAndGet();
55+
}
56+
57+
}).take(3).subscribe(observer);
58+
59+
verify(observer, times(1)).onNext(1);
60+
verify(observer, times(1)).onNext(2);
61+
verify(observer, times(1)).onNext(3);
62+
verify(observer, never()).onNext(4);
63+
verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class));
64+
verify(observer, times(1)).onCompleted();
65+
assertEquals(3, count.get());
66+
}
4067
}

0 commit comments

Comments
 (0)