Skip to content

Commit cb664ff

Browse files
akarnokdbenjchristensen
authored andcommitted
Repeat with Count
- merging changes from #807
1 parent 85debff commit cb664ff

File tree

3 files changed

+110
-9
lines changed

3 files changed

+110
-9
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5547,6 +5547,39 @@ public final Observable<T> repeat(Scheduler scheduler) {
55475547
return nest().lift(new OperatorRepeat<T>(scheduler));
55485548
}
55495549

5550+
/**
5551+
* Returns an Observable that repeats the sequence of items emitted by the source
5552+
* Observable at most count times.
5553+
*
5554+
* @param count
5555+
* the number of times the source Observable items are repeated,
5556+
* a count of 0 will yield an empty sequence.
5557+
* @return an Observable that repeats the sequence of items emitted by the source
5558+
* Observable at most count times.
5559+
*/
5560+
public final Observable<T> repeat(long count) {
5561+
if (count < 0) {
5562+
throw new IllegalArgumentException("count >= 0 expected");
5563+
}
5564+
return nest().lift(new OperatorRepeat<T>(count));
5565+
}
5566+
5567+
/**
5568+
* Returns an Observable that repeats the sequence of items emitted by the source
5569+
* Observable at most count times on a particular scheduler.
5570+
*
5571+
* @param count
5572+
* the number of times the source Observable items are repeated,
5573+
* a count of 0 will yield an empty sequence.
5574+
* @param scheduler
5575+
* the scheduler to emit the items on
5576+
* @return an Observable that repeats the sequence of items emitted by the source
5577+
* Observable at most count times on a particular scheduler.
5578+
*/
5579+
public final Observable<T> repeat(long count, Scheduler scheduler) {
5580+
return nest().lift(new OperatorRepeat<T>(count, scheduler));
5581+
}
5582+
55505583
/**
55515584
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
55525585
* Observable that will replay all of its items and notifications to any future {@link Observer}.

rxjava-core/src/main/java/rx/operators/OperatorRepeat.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,42 @@
2020
import rx.Scheduler;
2121
import rx.Scheduler.Inner;
2222
import rx.Subscriber;
23+
import rx.observers.Subscribers;
2324
import rx.schedulers.Schedulers;
2425
import rx.util.functions.Action1;
2526

2627
public class OperatorRepeat<T> implements Operator<T, Observable<T>> {
2728

2829
private final Scheduler scheduler;
30+
private final long count;
2931

30-
public OperatorRepeat(Scheduler scheduler) {
32+
public OperatorRepeat(long count, Scheduler scheduler) {
3133
this.scheduler = scheduler;
34+
this.count = count;
35+
}
3236

37+
public OperatorRepeat(Scheduler scheduler) {
38+
this(-1, scheduler);
39+
}
40+
41+
public OperatorRepeat(long count) {
42+
this(count, Schedulers.trampoline());
3343
}
3444

3545
public OperatorRepeat() {
36-
this(Schedulers.trampoline());
46+
this(-1, Schedulers.trampoline());
3747
}
3848

3949
@Override
4050
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
51+
if (count == 0) {
52+
child.onCompleted();
53+
return Subscribers.empty();
54+
}
4155
return new Subscriber<Observable<T>>(child) {
4256

57+
int executionCount = 0;
58+
4359
@Override
4460
public void onCompleted() {
4561
// ignore as we will keep repeating
@@ -58,12 +74,16 @@ public void onNext(final Observable<T> t) {
5874

5975
@Override
6076
public void call(final Inner inner) {
61-
77+
executionCount++;
6278
t.subscribe(new Subscriber<T>(child) {
6379

6480
@Override
6581
public void onCompleted() {
66-
inner.schedule(self);
82+
if (count == -1 || executionCount < count) {
83+
inner.schedule(self);
84+
} else {
85+
child.onCompleted();
86+
}
6787
}
6888

6989
@Override

rxjava-core/src/test/java/rx/operators/OperatorRepeatTest.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package rx.operators;
1717

1818
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
1921

2022
import java.util.concurrent.atomic.AtomicInteger;
2123

@@ -33,7 +35,7 @@
3335

3436
public class OperatorRepeatTest {
3537

36-
@Test
38+
@Test(timeout = 2000)
3739
public void testRepetition() {
3840
int NUM = 10;
3941
final AtomicInteger count = new AtomicInteger();
@@ -50,14 +52,14 @@ public Subscription onSubscribe(Observer<? super Integer> o) {
5052
assertEquals(NUM, value);
5153
}
5254

53-
@Test
55+
@Test(timeout = 2000)
5456
public void testRepeatTake() {
5557
Observable<Integer> xs = Observable.from(1, 2);
5658
Object[] ys = xs.repeat(Schedulers.newThread()).take(4).toList().toBlockingObservable().last().toArray();
5759
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
5860
}
5961

60-
@Test
62+
@Test(timeout = 20000)
6163
public void testNoStackOverFlow() {
6264
Observable.from(1).repeat(Schedulers.newThread()).take(100000).toBlockingObservable().last();
6365
}
@@ -70,7 +72,6 @@ public void testRepeatTakeWithSubscribeOn() throws InterruptedException {
7072

7173
@Override
7274
public void call(Subscriber<? super Integer> sub) {
73-
System.out.println("invoked!");
7475
counter.incrementAndGet();
7576
sub.onNext(1);
7677
sub.onNext(2);
@@ -82,7 +83,6 @@ public void call(Subscriber<? super Integer> sub) {
8283

8384
@Override
8485
public Integer call(Integer t1) {
85-
System.out.println("t1: " + t1);
8686
try {
8787
Thread.sleep(50);
8888
} catch (InterruptedException e) {
@@ -97,4 +97,52 @@ public Integer call(Integer t1) {
9797
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
9898
}
9999

100+
@Test(timeout = 2000)
101+
public void testRepeatAndTake() {
102+
@SuppressWarnings("unchecked")
103+
Observer<Object> o = mock(Observer.class);
104+
105+
Observable.from(1).repeat().take(10).subscribe(o);
106+
107+
verify(o, times(10)).onNext(1);
108+
verify(o).onCompleted();
109+
verify(o, never()).onError(any(Throwable.class));
110+
}
111+
@Test(timeout = 2000)
112+
public void testRepeatLimited() {
113+
@SuppressWarnings("unchecked")
114+
Observer<Object> o = mock(Observer.class);
115+
116+
Observable.from(1).repeat(10).subscribe(o);
117+
118+
verify(o, times(10)).onNext(1);
119+
verify(o).onCompleted();
120+
verify(o, never()).onError(any(Throwable.class));
121+
}
122+
@Test(timeout = 2000)
123+
public void testRepeatError() {
124+
@SuppressWarnings("unchecked")
125+
Observer<Object> o = mock(Observer.class);
126+
127+
Observable.error(new CustomException()).repeat(10).subscribe(o);
128+
129+
verify(o).onError(any(CustomException.class));
130+
verify(o, never()).onNext(any());
131+
verify(o, never()).onCompleted();
132+
133+
}
134+
@Test(timeout = 2000)
135+
public void testRepeatZero() {
136+
@SuppressWarnings("unchecked")
137+
Observer<Object> o = mock(Observer.class);
138+
139+
Observable.from(1).repeat(0).subscribe(o);
140+
141+
verify(o).onCompleted();
142+
verify(o, never()).onNext(any());
143+
verify(o, never()).onError(any(Throwable.class));
144+
}
145+
146+
private static class CustomException extends RuntimeException {
147+
}
100148
}

0 commit comments

Comments
 (0)