Skip to content

Commit 6dc04f1

Browse files
committed
Merge pull request #3642 from Chaoba/optimizate_single_just
1.x: Optimizate single just
2 parents 90e0423 + 4373f75 commit 6dc04f1

File tree

3 files changed

+458
-10
lines changed

3 files changed

+458
-10
lines changed

src/main/java/rx/Single.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import rx.annotations.Beta;
3838
import rx.internal.operators.*;
3939
import rx.internal.producers.SingleDelayedProducer;
40+
import rx.internal.util.ScalarSynchronousSingle;
41+
import rx.internal.util.UtilityFunctions;
4042
import rx.singles.BlockingSingle;
4143
import rx.observers.SafeSubscriber;
4244
import rx.plugins.*;
@@ -654,15 +656,7 @@ public void call(SingleSubscriber<? super T> singleSubscriber) {
654656
* @see <a href="http://reactivex.io/documentation/operators/just.html">ReactiveX operators documentation: Just</a>
655657
*/
656658
public static <T> Single<T> just(final T value) {
657-
// TODO add similar optimization as ScalarSynchronousObservable
658-
return Single.create(new OnSubscribe<T>() {
659-
660-
@Override
661-
public void call(SingleSubscriber<? super T> te) {
662-
te.onSuccess(value);
663-
}
664-
665-
});
659+
return ScalarSynchronousSingle.create(value);
666660
}
667661

668662
/**
@@ -683,6 +677,9 @@ public void call(SingleSubscriber<? super T> te) {
683677
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
684678
*/
685679
public static <T> Single<T> merge(final Single<? extends Single<? extends T>> source) {
680+
if (source instanceof ScalarSynchronousSingle) {
681+
return ((ScalarSynchronousSingle<T>) source).scalarFlatMap((Func1) UtilityFunctions.identity());
682+
}
686683
return Single.create(new OnSubscribe<T>() {
687684

688685
@Override
@@ -1296,6 +1293,9 @@ public final Observable<T> concatWith(Single<? extends T> t1) {
12961293
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
12971294
*/
12981295
public final <R> Single<R> flatMap(final Func1<? super T, ? extends Single<? extends R>> func) {
1296+
if (this instanceof ScalarSynchronousSingle) {
1297+
return ((ScalarSynchronousSingle<T>) this).scalarFlatMap(func);
1298+
}
12991299
return merge(map(func));
13001300
}
13011301

@@ -1378,6 +1378,9 @@ public final Observable<T> mergeWith(Single<? extends T> t1) {
13781378
* @see #subscribeOn
13791379
*/
13801380
public final Single<T> observeOn(Scheduler scheduler) {
1381+
if (this instanceof ScalarSynchronousSingle) {
1382+
return ((ScalarSynchronousSingle<T>)this).scalarScheduleOn(scheduler);
1383+
}
13811384
return lift(new OperatorObserveOn<T>(scheduler));
13821385
}
13831386

@@ -1768,6 +1771,9 @@ public void onNext(T t) {
17681771
* @see #observeOn
17691772
*/
17701773
public final Single<T> subscribeOn(final Scheduler scheduler) {
1774+
if (this instanceof ScalarSynchronousSingle) {
1775+
return ((ScalarSynchronousSingle<T>)this).scalarScheduleOn(scheduler);
1776+
}
17711777
return create(new OnSubscribe<T>() {
17721778
@Override
17731779
public void call(final SingleSubscriber<? super T> t) {
@@ -1803,7 +1809,7 @@ public void onError(Throwable error) {
18031809
}
18041810
});
18051811
}
1806-
});
1812+
});
18071813
}
18081814

18091815
/**
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
* <p/>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p/>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p/>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import rx.Scheduler;
19+
import rx.Scheduler.Worker;
20+
import rx.Single;
21+
import rx.SingleSubscriber;
22+
import rx.Subscriber;
23+
import rx.functions.Action0;
24+
import rx.functions.Func1;
25+
import rx.internal.schedulers.EventLoopsScheduler;
26+
27+
public final class ScalarSynchronousSingle<T> extends Single<T> {
28+
29+
public static final <T> ScalarSynchronousSingle<T> create(T t) {
30+
return new ScalarSynchronousSingle<T>(t);
31+
}
32+
33+
final T value;
34+
35+
protected ScalarSynchronousSingle(final T t) {
36+
super(new OnSubscribe<T>() {
37+
38+
@Override
39+
public void call(SingleSubscriber<? super T> te) {
40+
te.onSuccess(t);
41+
}
42+
43+
});
44+
this.value = t;
45+
}
46+
47+
public T get() {
48+
return value;
49+
}
50+
51+
/**
52+
* Customized observeOn/subscribeOn implementation which emits the scalar
53+
* value directly or with less overhead on the specified scheduler.
54+
*
55+
* @param scheduler the target scheduler
56+
* @return the new observable
57+
*/
58+
public Single<T> scalarScheduleOn(Scheduler scheduler) {
59+
if (scheduler instanceof EventLoopsScheduler) {
60+
EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
61+
return create(new DirectScheduledEmission<T>(es, value));
62+
}
63+
return create(new NormalScheduledEmission<T>(scheduler, value));
64+
}
65+
66+
/**
67+
* Optimized observeOn for scalar value observed on the EventLoopsScheduler.
68+
*/
69+
static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
70+
private final EventLoopsScheduler es;
71+
private final T value;
72+
73+
DirectScheduledEmission(EventLoopsScheduler es, T value) {
74+
this.es = es;
75+
this.value = value;
76+
}
77+
78+
@Override
79+
public void call(SingleSubscriber<? super T> singleSubscriber) {
80+
singleSubscriber.add(es.scheduleDirect(new ScalarSynchronousSingleAction<T>(singleSubscriber, value)));
81+
}
82+
}
83+
84+
/**
85+
* Emits a scalar value on a general scheduler.
86+
*/
87+
static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
88+
private final Scheduler scheduler;
89+
private final T value;
90+
91+
NormalScheduledEmission(Scheduler scheduler, T value) {
92+
this.scheduler = scheduler;
93+
this.value = value;
94+
}
95+
96+
@Override
97+
public void call(SingleSubscriber<? super T> singleSubscriber) {
98+
Worker worker = scheduler.createWorker();
99+
singleSubscriber.add(worker);
100+
worker.schedule(new ScalarSynchronousSingleAction<T>(singleSubscriber, value));
101+
}
102+
}
103+
104+
/**
105+
* Action that emits a single value when called.
106+
*/
107+
static final class ScalarSynchronousSingleAction<T> implements Action0 {
108+
private final SingleSubscriber<? super T> subscriber;
109+
private final T value;
110+
111+
ScalarSynchronousSingleAction(SingleSubscriber<? super T> subscriber,
112+
T value) {
113+
this.subscriber = subscriber;
114+
this.value = value;
115+
}
116+
117+
@Override
118+
public void call() {
119+
try {
120+
subscriber.onSuccess(value);
121+
} catch (Throwable t) {
122+
subscriber.onError(t);
123+
}
124+
}
125+
}
126+
127+
public <R> Single<R> scalarFlatMap(final Func1<? super T, ? extends Single<? extends R>> func) {
128+
return create(new OnSubscribe<R>() {
129+
@Override
130+
public void call(final SingleSubscriber<? super R> child) {
131+
132+
Single<? extends R> o = func.call(value);
133+
if (o instanceof ScalarSynchronousSingle) {
134+
child.onSuccess(((ScalarSynchronousSingle<? extends R>) o).value);
135+
} else {
136+
Subscriber<R> subscriber = new Subscriber<R>() {
137+
@Override
138+
public void onCompleted() {
139+
}
140+
141+
@Override
142+
public void onError(Throwable e) {
143+
child.onError(e);
144+
}
145+
146+
@Override
147+
public void onNext(R r) {
148+
child.onSuccess(r);
149+
}
150+
};
151+
child.add(subscriber);
152+
o.unsafeSubscribe(subscriber);
153+
}
154+
}
155+
});
156+
}
157+
}

0 commit comments

Comments
 (0)