Skip to content

Commit 2a4e9c6

Browse files
Special Handling of java.lang.Error and OnErrorNotImplemented
- #748 (comment) - #771 - #789 - SynchronizedObserver is for synchronization, not error handling or contract enforcements, that's the job of SafeSubscriber - Removed some unit tests that were asserting unsubscribe behavior that relied on SynchronizedObserver. They were testing something they are not responsible for.
1 parent 5c6db66 commit 2a4e9c6

14 files changed

+250
-639
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
import rx.subjects.ReplaySubject;
119119
import rx.subjects.Subject;
120120
import rx.subscriptions.Subscriptions;
121+
import rx.util.Exceptions;
121122
import rx.util.OnErrorNotImplementedException;
122123
import rx.util.Range;
123124
import rx.util.TimeInterval;
@@ -6964,10 +6965,9 @@ public void call() {
69646965
}
69656966

69666967
});
6967-
} catch (OnErrorNotImplementedException e) {
6968-
// special handling when onError is not implemented ... we just rethrow
6969-
throw e;
69706968
} catch (Throwable e) {
6969+
// special handling for certain Throwable/Error/Exception types
6970+
Exceptions.throwIfFatal(e);
69716971
// if an unhandled error occurs executing the onSubscribe we will propagate it
69726972
try {
69736973
observer.onError(hook.onSubscribeError(this, e));

rxjava-core/src/main/java/rx/observers/SafeSubscriber.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

2121
import rx.Subscriber;
22-
import rx.Subscription;
23-
import rx.operators.SafeObservableSubscription;
2422
import rx.plugins.RxJavaPlugins;
25-
import rx.subscriptions.Subscriptions;
2623
import rx.util.CompositeException;
24+
import rx.util.Exceptions;
2725
import rx.util.OnErrorNotImplementedException;
2826

2927
/**
@@ -74,6 +72,9 @@ public void onCompleted() {
7472
try {
7573
actual.onCompleted();
7674
} catch (Throwable e) {
75+
// we handle here instead of another method so we don't add stacks to the frame
76+
// which can prevent it from being able to handle StackOverflow
77+
Exceptions.throwIfFatal(e);
7778
// handle errors if the onCompleted implementation fails, not just if the Observable fails
7879
_onError(e);
7980
} finally {
@@ -85,6 +86,9 @@ public void onCompleted() {
8586

8687
@Override
8788
public void onError(Throwable e) {
89+
// we handle here instead of another method so we don't add stacks to the frame
90+
// which can prevent it from being able to handle StackOverflow
91+
Exceptions.throwIfFatal(e);
8892
if (isFinished.compareAndSet(false, true)) {
8993
_onError(e);
9094
}
@@ -97,6 +101,9 @@ public void onNext(T args) {
97101
actual.onNext(args);
98102
}
99103
} catch (Throwable e) {
104+
// we handle here instead of another method so we don't add stacks to the frame
105+
// which can prevent it from being able to handle StackOverflow
106+
Exceptions.throwIfFatal(e);
100107
// handle errors if the onNext implementation fails, not just if the Observable fails
101108
onError(e);
102109
}

rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,11 @@
1616
package rx.observers;
1717

1818
import rx.Observer;
19-
import rx.Subscriber;
20-
import rx.operators.SafeObservableSubscription;
2119

2220
/**
23-
* A thread-safe Observer for transitioning states in operators.
21+
* Synchronize execution to be single-threaded.
2422
* <p>
25-
* Execution rules are:
26-
* <ul>
27-
* <li>Allow only single-threaded, synchronous, ordered execution of onNext, onCompleted, onError</li>
28-
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
29-
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
30-
* </ul>
23+
* This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that.
3124
*
3225
* @param <T>
3326
*/
@@ -48,76 +41,33 @@ public final class SynchronizedObserver<T> implements Observer<T> {
4841
*/
4942

5043
private final Observer<? super T> observer;
51-
private final SafeObservableSubscription subscription;
52-
private volatile boolean finishRequested = false;
53-
private volatile boolean finished = false;
5444
private volatile Object lock;
5545

56-
public SynchronizedObserver(Observer<? super T> subscriber, SafeObservableSubscription subscription) {
46+
public SynchronizedObserver(Observer<? super T> subscriber) {
5747
this.observer = subscriber;
58-
this.subscription = subscription;
5948
this.lock = this;
6049
}
6150

62-
public SynchronizedObserver(Observer<? super T> subscriber, SafeObservableSubscription subscription, Object lock) {
51+
public SynchronizedObserver(Observer<? super T> subscriber, Object lock) {
6352
this.observer = subscriber;
64-
this.subscription = subscription;
6553
this.lock = lock;
6654
}
6755

68-
/**
69-
* Used when synchronizing an Observer without access to the subscription.
70-
*
71-
* @param Observer
72-
*/
73-
public SynchronizedObserver(Observer<? super T> subscriber) {
74-
this(subscriber, new SafeObservableSubscription());
75-
}
76-
7756
public void onNext(T arg) {
78-
if (finished || finishRequested || subscription.isUnsubscribed()) {
79-
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
80-
return;
81-
}
8257
synchronized (lock) {
83-
// check again since this could have changed while waiting
84-
if (finished || finishRequested || subscription.isUnsubscribed()) {
85-
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
86-
return;
87-
}
8858
observer.onNext(arg);
8959
}
9060
}
9161

9262
public void onError(Throwable e) {
93-
if (finished || subscription.isUnsubscribed()) {
94-
// another thread has already finished us, so we won't proceed
95-
return;
96-
}
97-
finishRequested = true;
9863
synchronized (lock) {
99-
// check again since this could have changed while waiting
100-
if (finished || subscription.isUnsubscribed()) {
101-
return;
102-
}
10364
observer.onError(e);
104-
finished = true;
10565
}
10666
}
10767

10868
public void onCompleted() {
109-
if (finished || subscription.isUnsubscribed()) {
110-
// another thread has already finished us, so we won't proceed
111-
return;
112-
}
113-
finishRequested = true;
11469
synchronized (lock) {
115-
// check again since this could have changed while waiting
116-
if (finished || subscription.isUnsubscribed()) {
117-
return;
118-
}
11970
observer.onCompleted();
120-
finished = true;
12171
}
12272
}
12373
}

rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import rx.Observer;
1919
import rx.Subscriber;
20-
import rx.operators.SafeObservableSubscription;
2120

2221
/**
2322
* A thread-safe Observer for transitioning states in operators.
@@ -37,9 +36,7 @@ public final class SynchronizedSubscriber<T> extends Subscriber<T> {
3736

3837
public SynchronizedSubscriber(Subscriber<? super T> subscriber, Object lock) {
3938
super(subscriber);
40-
SafeObservableSubscription s = new SafeObservableSubscription();
41-
subscriber.add(s);
42-
this.observer = new SynchronizedObserver<T>(subscriber, s, lock);
39+
this.observer = new SynchronizedObserver<T>(subscriber, lock);
4340
}
4441

4542
/**

rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,7 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
151151
* <p>
152152
* Bug report: https://github.com/Netflix/RxJava/issues/614
153153
*/
154-
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
155-
completeSubscription.add(subscription);
156-
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
154+
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver);
157155

158156
/**
159157
* Subscribe to the parent Observable to get to the children Observables

rxjava-core/src/main/java/rx/operators/OperationSynchronize.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,13 @@ public Synchronize(Observable<? extends T> innerObservable, Object lock) {
8686
private Object lock;
8787

8888
public Subscription onSubscribe(Observer<? super T> observer) {
89-
SafeObservableSubscription subscription = new SafeObservableSubscription();
9089
if (lock == null) {
91-
atomicObserver = new SynchronizedObserver<T>(observer, subscription);
90+
atomicObserver = new SynchronizedObserver<T>(observer);
9291
}
9392
else {
94-
atomicObserver = new SynchronizedObserver<T>(observer, subscription, lock);
93+
atomicObserver = new SynchronizedObserver<T>(observer, lock);
9594
}
96-
return subscription.wrap(innerObservable.subscribe(atomicObserver));
95+
return innerObservable.subscribe(atomicObserver);
9796
}
9897

9998
}

0 commit comments

Comments
 (0)