Skip to content

Commit 4834d85

Browse files
Merge pull request #962 from benjchristensen/serialize-synchronize
Migrate from SynchronizedObserver to SerializedObserver
2 parents fc2e45f + 6926fa6 commit 4834d85

22 files changed

+2896
-150
lines changed

rxjava-core/src/main/java/rx/Observable.java

+13-6
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import rx.operators.OnSubscribeFromIterable;
5353
import rx.operators.OnSubscribeRange;
5454
import rx.operators.OperationAll;
55-
import rx.operators.OperatorAmb;
5655
import rx.operators.OperationAny;
5756
import rx.operators.OperationAsObservable;
5857
import rx.operators.OperationAverage;
@@ -91,10 +90,8 @@
9190
import rx.operators.OperationSkip;
9291
import rx.operators.OperationSkipLast;
9392
import rx.operators.OperationSkipUntil;
94-
import rx.operators.OperatorSkipWhile;
9593
import rx.operators.OperationSum;
9694
import rx.operators.OperationSwitch;
97-
import rx.operators.OperationSynchronize;
9895
import rx.operators.OperationTakeLast;
9996
import rx.operators.OperationTakeTimed;
10097
import rx.operators.OperationTakeUntil;
@@ -107,6 +104,7 @@
107104
import rx.operators.OperationToObservableFuture;
108105
import rx.operators.OperationUsing;
109106
import rx.operators.OperationWindow;
107+
import rx.operators.OperatorAmb;
110108
import rx.operators.OperatorCast;
111109
import rx.operators.OperatorDoOnEach;
112110
import rx.operators.OperatorFilter;
@@ -120,8 +118,11 @@
120118
import rx.operators.OperatorRepeat;
121119
import rx.operators.OperatorRetry;
122120
import rx.operators.OperatorScan;
121+
import rx.operators.OperatorSerialize;
123122
import rx.operators.OperatorSkip;
123+
import rx.operators.OperatorSkipWhile;
124124
import rx.operators.OperatorSubscribeOn;
125+
import rx.operators.OperatorSynchronize;
125126
import rx.operators.OperatorTake;
126127
import rx.operators.OperatorTimeout;
127128
import rx.operators.OperatorTimeoutWithSelector;
@@ -2712,7 +2713,7 @@ public final static <T> Observable<T> switchOnNext(Observable<? extends Observab
27122713
*/
27132714
@Deprecated
27142715
public final static <T> Observable<T> synchronize(Observable<T> source) {
2715-
return create(OperationSynchronize.synchronize(source));
2716+
return source.synchronize();
27162717
}
27172718

27182719
/**
@@ -6197,6 +6198,10 @@ public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accum
61976198
return lift(new OperatorScan<R, T>(initialValue, accumulator));
61986199
}
61996200

6201+
public final Observable<T> serialize() {
6202+
return lift(new OperatorSerialize<T>());
6203+
}
6204+
62006205
/**
62016206
* If the source Observable completes after emitting a single item, return an Observable that emits that
62026207
* item. If the source Observable emits more than one item or no items, throw an
@@ -7259,9 +7264,10 @@ public final <R> Observable<R> switchMap(Func1<? super T, ? extends Observable<?
72597264
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
72607265
* synchronously notifies its {@link Observer}s
72617266
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
7267+
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
72627268
*/
72637269
public final Observable<T> synchronize() {
7264-
return create(OperationSynchronize.synchronize(this));
7270+
return lift(new OperatorSynchronize<T>());
72657271
}
72667272

72677273
/**
@@ -7283,9 +7289,10 @@ public final Observable<T> synchronize() {
72837289
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
72847290
* synchronously notifies its {@link Observer}s
72857291
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
7292+
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
72867293
*/
72877294
public final Observable<T> synchronize(Object lock) {
7288-
return create(OperationSynchronize.synchronize(this, lock));
7295+
return lift(new OperatorSynchronize<T>(lock));
72897296
}
72907297

72917298
/**

rxjava-core/src/main/java/rx/observers/SafeSubscriber.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
* <li>When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).</li>
5353
* </ul>
5454
* <p>
55-
* It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that.
55+
* It will not synchronize onNext execution. Use the {@link SerializedSubscriber} to do that.
5656
*
5757
* @param <T>
5858
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package rx.observers;
2+
3+
import java.util.ArrayList;
4+
5+
import rx.Observer;
6+
7+
/**
8+
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
9+
* <p>
10+
* When multiple threads are notifying they will be serialized by:
11+
* <p>
12+
* <li>Allowing only one thread at a time to emit</li>
13+
* <li>Adding notifications to a queue if another thread is already emitting</li>
14+
* <li>Not holding any locks or blocking any threads while emitting</li>
15+
* <p>
16+
*
17+
* @param <T>
18+
*/
19+
public class SerializedObserver<T> implements Observer<T> {
20+
private final Observer<? super T> actual;
21+
22+
private boolean emitting = false;
23+
private boolean terminated = false;
24+
private ArrayList<Object> queue = new ArrayList<Object>();
25+
26+
private static Sentinel NULL_SENTINEL = new Sentinel();
27+
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
28+
29+
private static class Sentinel {
30+
31+
}
32+
33+
private static class ErrorSentinel extends Sentinel {
34+
final Throwable e;
35+
36+
ErrorSentinel(Throwable e) {
37+
this.e = e;
38+
}
39+
}
40+
41+
public SerializedObserver(Observer<? super T> s) {
42+
this.actual = s;
43+
}
44+
45+
@Override
46+
public void onCompleted() {
47+
boolean canEmit = false;
48+
ArrayList<Object> list = null;
49+
synchronized (this) {
50+
if (terminated) {
51+
return;
52+
}
53+
terminated = true;
54+
if (!emitting) {
55+
// emit immediately
56+
emitting = true;
57+
canEmit = true;
58+
if (queue.size() > 0) {
59+
list = queue; // copy reference
60+
queue = new ArrayList<Object>(); // new version;
61+
}
62+
} else {
63+
// someone else is already emitting so just queue it
64+
queue.add(COMPLETE_SENTINEL);
65+
}
66+
}
67+
if (canEmit) {
68+
// we won the right to emit
69+
try {
70+
drainQueue(list);
71+
actual.onCompleted();
72+
} finally {
73+
synchronized (this) {
74+
emitting = false;
75+
}
76+
}
77+
}
78+
}
79+
80+
@Override
81+
public void onError(final Throwable e) {
82+
boolean canEmit = false;
83+
ArrayList<Object> list = null;
84+
synchronized (this) {
85+
if (terminated) {
86+
return;
87+
}
88+
terminated = true;
89+
if (!emitting) {
90+
// emit immediately
91+
emitting = true;
92+
canEmit = true;
93+
if (queue.size() > 0) {
94+
list = queue; // copy reference
95+
queue = new ArrayList<Object>(); // new version;
96+
}
97+
} else {
98+
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
99+
queue.clear();
100+
queue.add(new ErrorSentinel(e));
101+
}
102+
}
103+
if (canEmit) {
104+
// we won the right to emit
105+
try {
106+
drainQueue(list);
107+
actual.onError(e);
108+
} finally {
109+
synchronized (this) {
110+
emitting = false;
111+
}
112+
}
113+
}
114+
}
115+
116+
@Override
117+
public void onNext(T t) {
118+
boolean canEmit = false;
119+
ArrayList<Object> list = null;
120+
synchronized (this) {
121+
if (terminated) {
122+
return;
123+
}
124+
if (!emitting) {
125+
// emit immediately
126+
emitting = true;
127+
canEmit = true;
128+
if (queue.size() > 0) {
129+
list = queue; // copy reference
130+
queue = new ArrayList<Object>(); // new version;
131+
}
132+
} else {
133+
// someone else is already emitting so just queue it
134+
if (t == null) {
135+
queue.add(NULL_SENTINEL);
136+
} else {
137+
queue.add(t);
138+
}
139+
}
140+
}
141+
if (canEmit) {
142+
// we won the right to emit
143+
try {
144+
drainQueue(list);
145+
actual.onNext(t);
146+
} finally {
147+
synchronized (this) {
148+
if (terminated) {
149+
list = queue; // copy reference
150+
queue = new ArrayList<Object>(); // new version;
151+
} else {
152+
// release this thread
153+
emitting = false;
154+
canEmit = false;
155+
}
156+
}
157+
}
158+
}
159+
160+
// if terminated this will still be true so let's drain the rest of the queue
161+
if (canEmit) {
162+
drainQueue(list);
163+
}
164+
}
165+
166+
public void drainQueue(ArrayList<Object> list) {
167+
if (list == null || list.size() == 0) {
168+
return;
169+
}
170+
for (Object v : list) {
171+
if (v != null) {
172+
if (v instanceof Sentinel) {
173+
if (v == NULL_SENTINEL) {
174+
actual.onNext(null);
175+
} else if (v == COMPLETE_SENTINEL) {
176+
actual.onCompleted();
177+
} else if (v instanceof ErrorSentinel) {
178+
actual.onError(((ErrorSentinel) v).e);
179+
}
180+
} else {
181+
actual.onNext((T) v);
182+
}
183+
}
184+
}
185+
}
186+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package rx.observers;
2+
3+
import rx.Observer;
4+
import rx.Subscriber;
5+
6+
/**
7+
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
8+
* <p>
9+
* When multiple threads are notifying they will be serialized by:
10+
* <p>
11+
* <li>Allowing only one thread at a time to emit</li>
12+
* <li>Adding notifications to a queue if another thread is already emitting</li>
13+
* <li>Not holding any locks or blocking any threads while emitting</li>
14+
* <p>
15+
*
16+
* @param <T>
17+
*/
18+
public class SerializedSubscriber<T> extends Subscriber<T> {
19+
20+
private final Observer<T> s;
21+
22+
public SerializedSubscriber(Subscriber<? super T> s) {
23+
this.s = new SerializedObserver<T>(s);
24+
}
25+
26+
@Override
27+
public void onCompleted() {
28+
s.onCompleted();
29+
}
30+
31+
@Override
32+
public void onError(Throwable e) {
33+
s.onError(e);
34+
}
35+
36+
@Override
37+
public void onNext(T t) {
38+
s.onNext(t);
39+
}
40+
}

rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java

+2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
* This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that.
2424
*
2525
* @param <T>
26+
* @deprecated Use SerializedObserver instead as it doesn't block threads during event notification.
2627
*/
28+
@Deprecated
2729
public final class SynchronizedObserver<T> implements Observer<T> {
2830

2931
/**

rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java

+2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
* </ul>
3030
*
3131
* @param <T>
32+
* @deprecated Use SerializedSubscriber instead as it doesn't block threads during event notification.
3233
*/
34+
@Deprecated
3335
public final class SynchronizedSubscriber<T> extends Subscriber<T> {
3436

3537
private final Observer<? super T> observer;

rxjava-core/src/main/java/rx/operators/OperationDebounce.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.Subscription;
2727
import rx.functions.Action1;
2828
import rx.functions.Func1;
29+
import rx.observers.SerializedObserver;
2930
import rx.observers.SynchronizedObserver;
3031
import rx.schedulers.Schedulers;
3132
import rx.subscriptions.CompositeSubscription;
@@ -111,7 +112,7 @@ private static class DebounceObserver<T> implements Observer<T> {
111112
public DebounceObserver(Observer<? super T> observer, long timeout, TimeUnit unit, Scheduler scheduler) {
112113
// we need to synchronize the observer since the on* events can be coming from different
113114
// threads and are thus non-deterministic and could be interleaved
114-
this.observer = new SynchronizedObserver<T>(observer);
115+
this.observer = new SerializedObserver<T>(observer);
115116
this.timeout = timeout;
116117
this.unit = unit;
117118
this.scheduler = scheduler;

rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java

+2-10
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import rx.Observer;
2626
import rx.Subscription;
2727
import rx.exceptions.CompositeException;
28-
import rx.observers.SynchronizedObserver;
28+
import rx.observers.SerializedObserver;
2929
import rx.subscriptions.BooleanSubscription;
3030
import rx.subscriptions.CompositeSubscription;
3131

@@ -141,15 +141,7 @@ private MergeDelayErrorObservable(Observable<? extends Observable<? extends T>>
141141

142142
public Subscription onSubscribe(Observer<? super T> actualObserver) {
143143
CompositeSubscription completeSubscription = new CompositeSubscription();
144-
145-
/**
146-
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
147-
* <p>
148-
* The calls from each sequence must be serialized.
149-
* <p>
150-
* Bug report: https://github.com/Netflix/RxJava/issues/614
151-
*/
152-
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver);
144+
SerializedObserver<T> synchronizedObserver = new SerializedObserver<T>(actualObserver);
153145

154146
/**
155147
* Subscribe to the parent Observable to get to the children Observables

0 commit comments

Comments
 (0)