Skip to content

Commit 8bb05b5

Browse files
Merge pull request #828 from benjchristensen/repeat-operator
Repeat Operator
2 parents e0f970c + fae7a9b commit 8bb05b5

10 files changed

+347
-196
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import rx.operators.OperationOnErrorReturn;
7070
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7171
import rx.operators.OperationParallelMerge;
72-
import rx.operators.OperationRepeat;
72+
import rx.operators.OperatorRepeat;
7373
import rx.operators.OperationReplay;
7474
import rx.operators.OperationRetry;
7575
import rx.operators.OperationSample;
@@ -1616,7 +1616,7 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
16161616
public final static <T> Observable<T> just(T value) {
16171617
return from(Arrays.asList(value));
16181618
}
1619-
1619+
16201620
/**
16211621
* Returns an Observable that emits a single item and then completes, on a specified scheduler.
16221622
* <p>
@@ -2355,6 +2355,15 @@ public final static <T extends Comparable<? super T>> Observable<T> min(Observab
23552355
return OperationMinMax.min(source);
23562356
}
23572357

2358+
/**
2359+
* Convert the current Observable<T> into an Observable<Observable<T>>.
2360+
*
2361+
* @return
2362+
*/
2363+
private final Observable<Observable<T>> nest() {
2364+
return from(this);
2365+
}
2366+
23582367
/**
23592368
* Returns an Observable that never sends any items or notifications to an {@link Observer}.
23602369
* <p>
@@ -5518,7 +5527,7 @@ public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> acc
55185527
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55195528
*/
55205529
public final Observable<T> repeat() {
5521-
return this.repeat(Schedulers.currentThread());
5530+
return nest().lift(new OperatorRepeat<T>());
55225531
}
55235532

55245533
/**
@@ -5535,7 +5544,40 @@ public final Observable<T> repeat() {
55355544
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55365545
*/
55375546
public final Observable<T> repeat(Scheduler scheduler) {
5538-
return create(OperationRepeat.repeat(this, scheduler));
5547+
return nest().lift(new OperatorRepeat<T>(scheduler));
5548+
}
5549+
5550+
/**
5551+
* Returns an Observable that repeats the sequence of items emitted by the source
5552+
* Observable at most count times.
5553+
*
5554+
* @param count
5555+
* the number of times the source Observable items are repeated,
5556+
* a count of 0 will yield an empty sequence.
5557+
* @return an Observable that repeats the sequence of items emitted by the source
5558+
* Observable at most count times.
5559+
*/
5560+
public final Observable<T> repeat(long count) {
5561+
if (count < 0) {
5562+
throw new IllegalArgumentException("count >= 0 expected");
5563+
}
5564+
return nest().lift(new OperatorRepeat<T>(count));
5565+
}
5566+
5567+
/**
5568+
* Returns an Observable that repeats the sequence of items emitted by the source
5569+
* Observable at most count times on a particular scheduler.
5570+
*
5571+
* @param count
5572+
* the number of times the source Observable items are repeated,
5573+
* a count of 0 will yield an empty sequence.
5574+
* @param scheduler
5575+
* the scheduler to emit the items on
5576+
* @return an Observable that repeats the sequence of items emitted by the source
5577+
* Observable at most count times on a particular scheduler.
5578+
*/
5579+
public final Observable<T> repeat(long count, Scheduler scheduler) {
5580+
return nest().lift(new OperatorRepeat<T>(count, scheduler));
55395581
}
55405582

55415583
/**

rxjava-core/src/main/java/rx/observers/Observers.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,35 @@
11
package rx.observers;
22

33
import rx.Observer;
4+
import rx.Subscriber;
45
import rx.util.OnErrorNotImplementedException;
56
import rx.util.functions.Action0;
67
import rx.util.functions.Action1;
78

89
public class Observers {
910

10-
/**
11-
* Create an empty Observer that ignores all events.
12-
*/
13-
public static final <T> Observer<T> create() {
14-
return new Observer<T>() {
11+
private static final Observer<Object> EMPTY = new Observer<Object>() {
1512

16-
@Override
17-
public final void onCompleted() {
18-
// do nothing
19-
}
13+
@Override
14+
public final void onCompleted() {
15+
// do nothing
16+
}
2017

21-
@Override
22-
public final void onError(Throwable e) {
23-
throw new OnErrorNotImplementedException(e);
24-
}
18+
@Override
19+
public final void onError(Throwable e) {
20+
throw new OnErrorNotImplementedException(e);
21+
}
2522

26-
@Override
27-
public final void onNext(T args) {
28-
// do nothing
29-
}
23+
@Override
24+
public final void onNext(Object args) {
25+
// do nothing
26+
}
3027

31-
};
28+
};
29+
30+
@SuppressWarnings("unchecked")
31+
public static <T> Observer<T> empty() {
32+
return (Observer<T>) EMPTY;
3233
}
3334

3435
/**

rxjava-core/src/main/java/rx/observers/Subscribers.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,30 @@
88

99
public class Subscribers {
1010

11+
private static final Subscriber<Object> EMPTY = new Subscriber<Object>() {
12+
13+
@Override
14+
public final void onCompleted() {
15+
// do nothing
16+
}
17+
18+
@Override
19+
public final void onError(Throwable e) {
20+
throw new OnErrorNotImplementedException(e);
21+
}
22+
23+
@Override
24+
public final void onNext(Object args) {
25+
// do nothing
26+
}
27+
28+
};
29+
30+
@SuppressWarnings("unchecked")
31+
public static <T> Subscriber<T> empty() {
32+
return (Subscriber<T>) EMPTY;
33+
}
34+
1135
public static <T> Subscriber<T> from(final Observer<? super T> o) {
1236
return new Subscriber<T>() {
1337

@@ -29,30 +53,6 @@ public void onNext(T t) {
2953
};
3054
}
3155

32-
/**
33-
* Create an empty Subscriber that ignores all events.
34-
*/
35-
public static final <T> Subscriber<T> create() {
36-
return new Subscriber<T>() {
37-
38-
@Override
39-
public final void onCompleted() {
40-
// do nothing
41-
}
42-
43-
@Override
44-
public final void onError(Throwable e) {
45-
throw new OnErrorNotImplementedException(e);
46-
}
47-
48-
@Override
49-
public final void onNext(T args) {
50-
// do nothing
51-
}
52-
53-
};
54-
}
55-
5656
/**
5757
* Create an Subscriber that receives `onNext` and ignores `onError` and `onCompleted`.
5858
*/

rxjava-core/src/main/java/rx/observers/TestObserver.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
*/
2929
public class TestObserver<T> implements Observer<T> {
3030

31-
private final Observer<Object> EMPTY = new EmptyObserver<Object>();
3231

3332
private final Observer<T> delegate;
3433
private final ArrayList<T> onNextEvents = new ArrayList<T>();
@@ -39,9 +38,8 @@ public TestObserver(Observer<T> delegate) {
3938
this.delegate = delegate;
4039
}
4140

42-
@SuppressWarnings("unchecked")
4341
public TestObserver() {
44-
this.delegate = (Observer<T>) EMPTY;
42+
this.delegate = Observers.empty();
4543
}
4644

4745
@Override

rxjava-core/src/main/java/rx/observers/TestSubscriber.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
*/
2727
public class TestSubscriber<T> extends Subscriber<T> {
2828

29-
private final Subscriber<Object> EMPTY = Subscribers.create();
30-
3129
private final TestObserver<T> testObserver;
3230

3331
public TestSubscriber(Subscriber<T> delegate) {
@@ -38,9 +36,8 @@ public TestSubscriber(Observer<T> delegate) {
3836
this.testObserver = new TestObserver<T>(delegate);
3937
}
4038

41-
@SuppressWarnings("unchecked")
4239
public TestSubscriber() {
43-
this.testObserver = new TestObserver<T>((Subscriber<T>) EMPTY);
40+
this.testObserver = new TestObserver<T>(Subscribers.<T>empty());
4441
}
4542

4643
@Override

rxjava-core/src/main/java/rx/operators/OperationRepeat.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

0 commit comments

Comments
 (0)