Skip to content

Commit 3722fe0

Browse files
Merge pull request #1817 from benjchristensen/lift-error-handling
Fix Synchronous OnSubscribe Exception Skips Operators
2 parents d8ebda5 + de7b1f5 commit 3722fe0

File tree

2 files changed

+65
-6
lines changed

2 files changed

+65
-6
lines changed

src/main/java/rx/Observable.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,25 @@ public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift)
139139
public void call(Subscriber<? super R> o) {
140140
try {
141141
Subscriber<? super T> st = hook.onLift(lift).call(o);
142-
// new Subscriber created and being subscribed with so 'onStart' it
143-
st.onStart();
144-
onSubscribe.call(st);
142+
try {
143+
// new Subscriber created and being subscribed with so 'onStart' it
144+
st.onStart();
145+
onSubscribe.call(st);
146+
} catch (Throwable e) {
147+
// localized capture of errors rather than it skipping all operators
148+
// and ending up in the try/catch of the subscribe method which then
149+
// prevents onErrorResumeNext and other similar approaches to error handling
150+
if (e instanceof OnErrorNotImplementedException) {
151+
throw (OnErrorNotImplementedException) e;
152+
}
153+
st.onError(e);
154+
}
145155
} catch (Throwable e) {
146-
// localized capture of errors rather than it skipping all operators
147-
// and ending up in the try/catch of the subscribe method which then
148-
// prevents onErrorResumeNext and other similar approaches to error handling
149156
if (e instanceof OnErrorNotImplementedException) {
150157
throw (OnErrorNotImplementedException) e;
151158
}
159+
// if the lift function failed all we can do is pass the error to the final Subscriber
160+
// as we don't have the operator available to us
152161
o.onError(e);
153162
}
154163
}

src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservableTest.java

+50
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.mockito.Mockito;
2828

2929
import rx.Observable;
30+
import rx.Observable.OnSubscribe;
3031
import rx.Observer;
3132
import rx.Subscriber;
3233
import rx.Subscription;
@@ -105,6 +106,55 @@ public String call(String s) {
105106
verify(observer, times(1)).onNext("twoResume");
106107
verify(observer, times(1)).onNext("threeResume");
107108
}
109+
110+
@Test
111+
public void testResumeNextWithFailedOnSubscribe() {
112+
Subscription s = mock(Subscription.class);
113+
Observable<String> testObservable = Observable.create(new OnSubscribe<String>() {
114+
115+
@Override
116+
public void call(Subscriber<? super String> t1) {
117+
throw new RuntimeException("force failure");
118+
}
119+
120+
});
121+
Observable<String> resume = Observable.just("resume");
122+
Observable<String> observable = testObservable.onErrorResumeNext(resume);
123+
124+
@SuppressWarnings("unchecked")
125+
Observer<String> observer = mock(Observer.class);
126+
observable.subscribe(observer);
127+
128+
verify(observer, Mockito.never()).onError(any(Throwable.class));
129+
verify(observer, times(1)).onCompleted();
130+
verify(observer, times(1)).onNext("resume");
131+
}
132+
133+
@Test
134+
public void testResumeNextWithFailedOnSubscribeAsync() {
135+
Subscription s = mock(Subscription.class);
136+
Observable<String> testObservable = Observable.create(new OnSubscribe<String>() {
137+
138+
@Override
139+
public void call(Subscriber<? super String> t1) {
140+
throw new RuntimeException("force failure");
141+
}
142+
143+
});
144+
Observable<String> resume = Observable.just("resume");
145+
Observable<String> observable = testObservable.subscribeOn(Schedulers.io()).onErrorResumeNext(resume);
146+
147+
@SuppressWarnings("unchecked")
148+
Observer<String> observer = mock(Observer.class);
149+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
150+
observable.subscribe(ts);
151+
152+
ts.awaitTerminalEvent();
153+
154+
verify(observer, Mockito.never()).onError(any(Throwable.class));
155+
verify(observer, times(1)).onCompleted();
156+
verify(observer, times(1)).onNext("resume");
157+
}
108158

109159
private static class TestObservable implements Observable.OnSubscribe<String> {
110160

0 commit comments

Comments
 (0)