Skip to content

Commit 52cd2cd

Browse files
Merge pull request #835 from benjchristensen/observeOn
ObserveOn Operator with Backpressure
2 parents d56b1b9 + d5e5df4 commit 52cd2cd

19 files changed

+646
-234
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import rx.operators.OperationDematerialize;
5050
import rx.operators.OperationDistinct;
5151
import rx.operators.OperationDistinctUntilChanged;
52-
import rx.operators.OperatorDoOnEach;
5352
import rx.operators.OperationElementAt;
5453
import rx.operators.OperationFilter;
5554
import rx.operators.OperationFinally;
@@ -63,13 +62,11 @@
6362
import rx.operators.OperationMergeDelayError;
6463
import rx.operators.OperationMinMax;
6564
import rx.operators.OperationMulticast;
66-
import rx.operators.OperationObserveOn;
6765
import rx.operators.OperationOnErrorResumeNextViaFunction;
6866
import rx.operators.OperationOnErrorResumeNextViaObservable;
6967
import rx.operators.OperationOnErrorReturn;
7068
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7169
import rx.operators.OperationParallelMerge;
72-
import rx.operators.OperatorRepeat;
7370
import rx.operators.OperationReplay;
7471
import rx.operators.OperationRetry;
7572
import rx.operators.OperationSample;
@@ -96,18 +93,21 @@
9693
import rx.operators.OperationToObservableFuture;
9794
import rx.operators.OperationUsing;
9895
import rx.operators.OperationWindow;
99-
import rx.operators.OperatorSubscribeOn;
100-
import rx.operators.OperatorZip;
10196
import rx.operators.OperatorCast;
97+
import rx.operators.OperatorDoOnEach;
10298
import rx.operators.OperatorFromIterable;
10399
import rx.operators.OperatorGroupBy;
104100
import rx.operators.OperatorMap;
105101
import rx.operators.OperatorMerge;
102+
import rx.operators.OperatorObserveOn;
106103
import rx.operators.OperatorParallel;
104+
import rx.operators.OperatorRepeat;
105+
import rx.operators.OperatorSubscribeOn;
107106
import rx.operators.OperatorTake;
108107
import rx.operators.OperatorTimestamp;
109108
import rx.operators.OperatorToObservableList;
110109
import rx.operators.OperatorToObservableSortedList;
110+
import rx.operators.OperatorZip;
111111
import rx.operators.OperatorZipIterable;
112112
import rx.plugins.RxJavaObservableExecutionHook;
113113
import rx.plugins.RxJavaPlugins;
@@ -5139,8 +5139,7 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
51395139
}
51405140

51415141
/**
5142-
* Modify the source Observable so that it asynchronously notifies {@link Observer}s on the
5143-
* specified {@link Scheduler}.
5142+
* Move notifications to the specified {@link Scheduler} one `onNext` at a time.
51445143
* <p>
51455144
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
51465145
*
@@ -5151,9 +5150,26 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
51515150
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
51525151
*/
51535152
public final Observable<T> observeOn(Scheduler scheduler) {
5154-
return create(OperationObserveOn.observeOn(this, scheduler));
5153+
return lift(new OperatorObserveOn<T>(scheduler));
51555154
}
51565155

5156+
/**
5157+
* Move notifications to the specified {@link Scheduler} asynchronously with a buffer of the given size.
5158+
* <p>
5159+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
5160+
*
5161+
* @param scheduler
5162+
* the {@link Scheduler} to notify {@link Observer}s on
5163+
* @param bufferSize
5164+
* that will be rounded up to the next power of 2
5165+
* @return the source Observable modified so that its {@link Observer}s are notified on the
5166+
* specified {@link Scheduler}
5167+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
5168+
*/
5169+
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
5170+
return lift(new OperatorObserveOn<T>(scheduler, bufferSize));
5171+
}
5172+
51575173
/**
51585174
* Filters the items emitted by an Observable, only emitting those of the specified type.
51595175
* <p>
@@ -5296,7 +5312,9 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
52965312
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-parallel">RxJava Wiki: parallel()</a>
52975313
*/
52985314
public final <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
5299-
return lift(new OperatorParallel<T, R>(f, Schedulers.computation()));
5315+
// TODO move this back to Schedulers.computation() again once that is properly using eventloops
5316+
// see https://github.com/Netflix/RxJava/issues/713 for why this was changed
5317+
return lift(new OperatorParallel<T, R>(f, Schedulers.newThread()));
53005318
}
53015319

53025320
/**

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 0 additions & 129 deletions
This file was deleted.

0 commit comments

Comments
 (0)