Skip to content

Commit 503d369

Browse files
committed
Merge pull request #3614 from akarnokd/JustBackpressure1xV2
1.x: just() now supports backpressure (+ related fixes/changes)
2 parents 5ab00f9 + c925e86 commit 503d369

8 files changed

+641
-197
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8330,7 +8330,7 @@ public final Observable<T> subscribeOn(Scheduler scheduler) {
83308330
if (this instanceof ScalarSynchronousObservable) {
83318331
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
83328332
}
8333-
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
8333+
return create(new OperatorSubscribeOn<T>(this, scheduler));
83348334
}
83358335

83368336
/**

src/main/java/rx/Single.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,8 +1736,43 @@ public void onNext(T t) {
17361736
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
17371737
* @see #observeOn
17381738
*/
1739-
public final Single<T> subscribeOn(Scheduler scheduler) {
1740-
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
1739+
public final Single<T> subscribeOn(final Scheduler scheduler) {
1740+
return create(new OnSubscribe<T>() {
1741+
@Override
1742+
public void call(final SingleSubscriber<? super T> t) {
1743+
final Scheduler.Worker w = scheduler.createWorker();
1744+
t.add(w);
1745+
1746+
w.schedule(new Action0() {
1747+
@Override
1748+
public void call() {
1749+
SingleSubscriber<T> ssub = new SingleSubscriber<T>() {
1750+
@Override
1751+
public void onSuccess(T value) {
1752+
try {
1753+
t.onSuccess(value);
1754+
} finally {
1755+
w.unsubscribe();
1756+
}
1757+
}
1758+
1759+
@Override
1760+
public void onError(Throwable error) {
1761+
try {
1762+
t.onError(error);
1763+
} finally {
1764+
w.unsubscribe();
1765+
}
1766+
}
1767+
};
1768+
1769+
t.add(ssub);
1770+
1771+
Single.this.subscribe(ssub);
1772+
}
1773+
});
1774+
}
1775+
});
17411776
}
17421777

17431778
/**

src/main/java/rx/internal/operators/OperatorSubscribeOn.java

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -15,96 +15,84 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable;
19-
import rx.Observable.Operator;
20-
import rx.Producer;
21-
import rx.Scheduler;
18+
import rx.*;
19+
import rx.Observable.OnSubscribe;
2220
import rx.Scheduler.Worker;
23-
import rx.Subscriber;
2421
import rx.functions.Action0;
2522

2623
/**
2724
* Subscribes Observers on the specified {@code Scheduler}.
2825
* <p>
2926
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/subscribeOn.png" alt="">
27+
*
28+
* @param <T> the value type of the actual source
3029
*/
31-
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
30+
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
3231

33-
private final Scheduler scheduler;
32+
final Scheduler scheduler;
33+
final Observable<T> source;
3434

35-
public OperatorSubscribeOn(Scheduler scheduler) {
35+
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
3636
this.scheduler = scheduler;
37+
this.source = source;
3738
}
3839

3940
@Override
40-
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
41+
public void call(final Subscriber<? super T> subscriber) {
4142
final Worker inner = scheduler.createWorker();
4243
subscriber.add(inner);
43-
return new Subscriber<Observable<T>>(subscriber) {
44-
45-
@Override
46-
public void onCompleted() {
47-
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
48-
}
49-
50-
@Override
51-
public void onError(Throwable e) {
52-
subscriber.onError(e);
53-
}
54-
44+
45+
inner.schedule(new Action0() {
5546
@Override
56-
public void onNext(final Observable<T> o) {
57-
inner.schedule(new Action0() {
58-
47+
public void call() {
48+
final Thread t = Thread.currentThread();
49+
50+
Subscriber<T> s = new Subscriber<T>(subscriber) {
5951
@Override
60-
public void call() {
61-
final Thread t = Thread.currentThread();
62-
o.unsafeSubscribe(new Subscriber<T>(subscriber) {
63-
64-
@Override
65-
public void onCompleted() {
66-
subscriber.onCompleted();
67-
}
68-
69-
@Override
70-
public void onError(Throwable e) {
71-
subscriber.onError(e);
72-
}
73-
74-
@Override
75-
public void onNext(T t) {
76-
subscriber.onNext(t);
77-
}
78-
52+
public void onNext(T t) {
53+
subscriber.onNext(t);
54+
}
55+
56+
@Override
57+
public void onError(Throwable e) {
58+
try {
59+
subscriber.onError(e);
60+
} finally {
61+
inner.unsubscribe();
62+
}
63+
}
64+
65+
@Override
66+
public void onCompleted() {
67+
try {
68+
subscriber.onCompleted();
69+
} finally {
70+
inner.unsubscribe();
71+
}
72+
}
73+
74+
@Override
75+
public void setProducer(final Producer p) {
76+
subscriber.setProducer(new Producer() {
7977
@Override
80-
public void setProducer(final Producer producer) {
81-
subscriber.setProducer(new Producer() {
82-
83-
@Override
84-
public void request(final long n) {
85-
if (Thread.currentThread() == t) {
86-
// don't schedule if we're already on the thread (primarily for first setProducer call)
87-
// see unit test 'testSetProducerSynchronousRequest' for more context on this
88-
producer.request(n);
89-
} else {
90-
inner.schedule(new Action0() {
91-
92-
@Override
93-
public void call() {
94-
producer.request(n);
95-
}
96-
});
78+
public void request(final long n) {
79+
if (t == Thread.currentThread()) {
80+
p.request(n);
81+
} else {
82+
inner.schedule(new Action0() {
83+
@Override
84+
public void call() {
85+
p.request(n);
9786
}
98-
}
99-
100-
});
87+
});
88+
}
10189
}
102-
10390
});
10491
}
105-
});
92+
};
93+
94+
source.unsafeSubscribe(s);
10695
}
107-
108-
};
96+
});
10997
}
110-
}
98+
}

0 commit comments

Comments
 (0)