Skip to content

Commit ad6c70b

Browse files
Merge pull request #890 from benjchristensen/subscribeOn-subscriptions
Split SubscribeOn into SubscribeOn/UnsubscribeOn
2 parents 18f545a + 873fa75 commit ad6c70b

8 files changed

+814
-336
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import rx.operators.OperatorTimestamp;
109109
import rx.operators.OperatorToObservableList;
110110
import rx.operators.OperatorToObservableSortedList;
111+
import rx.operators.OperatorUnsubscribeOn;
111112
import rx.operators.OperatorZip;
112113
import rx.operators.OperatorZipIterable;
113114
import rx.plugins.RxJavaObservableExecutionHook;
@@ -7074,14 +7075,14 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
70747075
}
70757076

70767077
/**
7077-
* Asynchronously subscribes and unsubscribes Observers to this Observable on the specified
7078+
* Asynchronously subscribes Observers to this Observable on the specified
70787079
* {@link Scheduler}.
70797080
* <p>
70807081
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/subscribeOn.png">
70817082
*
70827083
* @param scheduler
7083-
* the {@link Scheduler} to perform subscription and unsubscription actions on
7084-
* @return the source Observable modified so that its subscriptions and unsubscriptions happen on the
7084+
* the {@link Scheduler} to perform subscription actions on
7085+
* @return the source Observable modified so that its subscriptions happen on the
70857086
* specified {@link Scheduler}
70867087
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
70877088
* @see #subscribeOn(rx.Scheduler, int)
@@ -8204,6 +8205,17 @@ public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Intege
82048205
return lift(new OperatorToObservableSortedList<T>(sortFunction));
82058206
}
82068207

8208+
/**
8209+
* Asynchronously unsubscribes on the specified {@link Scheduler}.
8210+
*
8211+
* @param scheduler
8212+
* the {@link Scheduler} to perform subscription and unsubscription actions on
8213+
* @return the source Observable modified so that its unsubscriptions happen on the specified {@link Scheduler}
8214+
*/
8215+
public final Observable<T> unsubscribeOn(Scheduler scheduler) {
8216+
return lift(new OperatorUnsubscribeOn<T>(scheduler));
8217+
}
8218+
82078219
/**
82088220
* Returns an Observable that represents a filtered version of the source Observable.
82098221
* <p>

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 8 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -23,46 +23,16 @@
2323
import rx.util.functions.Action1;
2424

2525
/**
26-
* Subscribes and unsubscribes Observers on the specified Scheduler.
26+
* Subscribes Observers on the specified Scheduler.
2727
* <p>
28-
* Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
29-
* in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
30-
* <p>
31-
* See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
32-
* subscribe is solving.
33-
*
3428
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
3529
*/
3630
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
3731

3832
private final Scheduler scheduler;
39-
/**
40-
* Indicate that events fired between the original subscription time and
41-
* the actual subscription time should not get lost.
42-
*/
43-
private final boolean dontLoseEvents;
44-
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
45-
private final int bufferSize;
4633

4734
public OperatorSubscribeOn(Scheduler scheduler) {
4835
this.scheduler = scheduler;
49-
this.dontLoseEvents = false;
50-
this.bufferSize = -1;
51-
}
52-
53-
/**
54-
* Construct a SubscribeOn operator.
55-
*
56-
* @param scheduler
57-
* the target scheduler
58-
* @param bufferSize
59-
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
60-
* block the source. -1 indicates an unbounded buffer
61-
*/
62-
public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {
63-
this.scheduler = scheduler;
64-
this.dontLoseEvents = true;
65-
this.bufferSize = bufferSize;
6636
}
6737

6838
@Override
@@ -71,41 +41,23 @@ public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscr
7141

7242
@Override
7343
public void onCompleted() {
74-
// ignore
44+
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
7545
}
7646

7747
@Override
7848
public void onError(Throwable e) {
7949
subscriber.onError(e);
8050
}
8151

82-
boolean checkNeedBuffer(Observable<?> o) {
83-
return dontLoseEvents;
84-
}
85-
8652
@Override
8753
public void onNext(final Observable<T> o) {
88-
if (checkNeedBuffer(o)) {
89-
// use buffering (possibly blocking) for a possibly synchronous subscribe
90-
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
91-
o.subscribe(bus);
92-
subscriber.add(scheduler.schedule(new Action1<Inner>() {
93-
@Override
94-
public void call(final Inner inner) {
95-
bus.enterPassthroughMode();
96-
}
97-
}));
98-
return;
99-
} else {
100-
// no buffering (async subscribe)
101-
subscriber.add(scheduler.schedule(new Action1<Inner>() {
54+
subscriber.add(scheduler.schedule(new Action1<Inner>() {
10255

103-
@Override
104-
public void call(final Inner inner) {
105-
o.subscribe(subscriber);
106-
}
107-
}));
108-
}
56+
@Override
57+
public void call(final Inner inner) {
58+
o.subscribe(subscriber);
59+
}
60+
}));
10961
}
11062

11163
};
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Scheduler;
21+
import rx.Scheduler.Inner;
22+
import rx.Subscriber;
23+
import rx.util.functions.Action1;
24+
25+
/**
26+
* Subscribes and unsubscribes Observers on the specified Scheduler.
27+
* <p>
28+
* Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
29+
* in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
30+
* <p>
31+
* See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
32+
* subscribe is solving.
33+
*
34+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
35+
*/
36+
public class OperatorSubscribeOnBounded<T> implements Operator<T, Observable<T>> {
37+
38+
private final Scheduler scheduler;
39+
/**
40+
* Indicate that events fired between the original subscription time and
41+
* the actual subscription time should not get lost.
42+
*/
43+
private final boolean dontLoseEvents;
44+
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
45+
private final int bufferSize;
46+
47+
public OperatorSubscribeOnBounded(Scheduler scheduler) {
48+
this.scheduler = scheduler;
49+
this.dontLoseEvents = false;
50+
this.bufferSize = -1;
51+
}
52+
53+
/**
54+
* Construct a SubscribeOn operator.
55+
*
56+
* @param scheduler
57+
* the target scheduler
58+
* @param bufferSize
59+
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
60+
* block the source. -1 indicates an unbounded buffer
61+
*/
62+
public OperatorSubscribeOnBounded(Scheduler scheduler, int bufferSize) {
63+
this.scheduler = scheduler;
64+
this.dontLoseEvents = true;
65+
this.bufferSize = bufferSize;
66+
}
67+
68+
@Override
69+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
70+
return new Subscriber<Observable<T>>(subscriber) {
71+
72+
@Override
73+
public void onCompleted() {
74+
// ignore
75+
}
76+
77+
@Override
78+
public void onError(Throwable e) {
79+
subscriber.onError(e);
80+
}
81+
82+
boolean checkNeedBuffer(Observable<?> o) {
83+
return dontLoseEvents;
84+
}
85+
86+
@Override
87+
public void onNext(final Observable<T> o) {
88+
if (checkNeedBuffer(o)) {
89+
// use buffering (possibly blocking) for a possibly synchronous subscribe
90+
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
91+
o.subscribe(bus);
92+
subscriber.add(scheduler.schedule(new Action1<Inner>() {
93+
@Override
94+
public void call(final Inner inner) {
95+
bus.enterPassthroughMode();
96+
}
97+
}));
98+
return;
99+
} else {
100+
// no buffering (async subscribe)
101+
subscriber.add(scheduler.schedule(new Action1<Inner>() {
102+
103+
@Override
104+
public void call(final Inner inner) {
105+
o.subscribe(subscriber);
106+
}
107+
}));
108+
}
109+
}
110+
111+
};
112+
}
113+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Scheduler;
20+
import rx.Scheduler.Inner;
21+
import rx.Subscriber;
22+
import rx.subscriptions.CompositeSubscription;
23+
import rx.subscriptions.MultipleAssignmentSubscription;
24+
import rx.subscriptions.Subscriptions;
25+
import rx.util.functions.Action0;
26+
import rx.util.functions.Action1;
27+
28+
/**
29+
* Unsubscribes on the specified Scheduler.
30+
* <p>
31+
*/
32+
public class OperatorUnsubscribeOn<T> implements Operator<T, T> {
33+
34+
private final Scheduler scheduler;
35+
36+
public OperatorUnsubscribeOn(Scheduler scheduler) {
37+
this.scheduler = scheduler;
38+
}
39+
40+
@Override
41+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
42+
final CompositeSubscription parentSubscription = new CompositeSubscription();
43+
subscriber.add(Subscriptions.create(new Action0() {
44+
45+
@Override
46+
public void call() {
47+
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
48+
mas.set(scheduler.schedule(new Action1<Inner>() {
49+
50+
@Override
51+
public void call(final Inner inner) {
52+
parentSubscription.unsubscribe();
53+
mas.unsubscribe();
54+
}
55+
}));
56+
}
57+
58+
}));
59+
60+
return new Subscriber<T>(parentSubscription) {
61+
62+
@Override
63+
public void onCompleted() {
64+
subscriber.onCompleted();
65+
}
66+
67+
@Override
68+
public void onError(Throwable e) {
69+
subscriber.onError(e);
70+
}
71+
72+
@Override
73+
public void onNext(T t) {
74+
subscriber.onNext(t);
75+
}
76+
77+
};
78+
}
79+
}

rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ public void call() {
700700

701701
});
702702
} else {
703-
return group.nest().lift(new OperatorSubscribeOn<Integer>(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
703+
return group.nest().lift(new OperatorSubscribeOnBounded<Integer>(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
704704

705705
@Override
706706
public String call(Integer t1) {
@@ -826,7 +826,7 @@ public Integer call(Integer t) {
826826

827827
@Override
828828
public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
829-
return group.nest().lift(new OperatorSubscribeOn<Integer>(Schedulers.newThread(), 0)).map(new Func1<Integer, String>() {
829+
return group.nest().lift(new OperatorSubscribeOnBounded<Integer>(Schedulers.newThread(), 0)).map(new Func1<Integer, String>() {
830830

831831
@Override
832832
public String call(Integer t1) {

0 commit comments

Comments
 (0)