Skip to content

Commit b293751

Browse files
committed
Merge pull request #3468 from zsxwing/fix-for-OnErrorFailedException
1.x: Fix other places that may swallow OnErrorFailedException
2 parents eb65ba3 + e2b234a commit b293751

File tree

3 files changed

+117
-19
lines changed

3 files changed

+117
-19
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8176,10 +8176,8 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
81768176
// if an unhandled error occurs executing the onSubscribe we will propagate it
81778177
try {
81788178
subscriber.onError(hook.onSubscribeError(e));
8179-
} catch (OnErrorNotImplementedException e2) {
8180-
// special handling when onError is not implemented ... we just rethrow
8181-
throw e2;
81828179
} catch (Throwable e2) {
8180+
Exceptions.throwIfFatal(e2);
81838181
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
81848182
// so we are unable to propagate the error correctly and will just throw
81858183
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
@@ -8271,10 +8269,8 @@ private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Obse
82718269
// if an unhandled error occurs executing the onSubscribe we will propagate it
82728270
try {
82738271
subscriber.onError(hook.onSubscribeError(e));
8274-
} catch (OnErrorNotImplementedException e2) {
8275-
// special handling when onError is not implemented ... we just rethrow
8276-
throw e2;
82778272
} catch (Throwable e2) {
8273+
Exceptions.throwIfFatal(e2);
82788274
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
82798275
// so we are unable to propagate the error correctly and will just throw
82808276
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);

src/main/java/rx/Single.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,14 @@ public void call(Subscriber<? super R> o) {
190190
st.onStart();
191191
onSubscribe.call(st);
192192
} catch (Throwable e) {
193-
// localized capture of errors rather than it skipping all operators
193+
Exceptions.throwIfFatal(e);
194+
// localized capture of errors rather than it skipping all operators
194195
// and ending up in the try/catch of the subscribe method which then
195196
// prevents onErrorResumeNext and other similar approaches to error handling
196-
if (e instanceof OnErrorNotImplementedException) {
197-
throw (OnErrorNotImplementedException) e;
198-
}
199197
st.onError(e);
200198
}
201199
} catch (Throwable e) {
202-
if (e instanceof OnErrorNotImplementedException) {
203-
throw (OnErrorNotImplementedException) e;
204-
}
200+
Exceptions.throwIfFatal(e);
205201
// if the lift function failed all we can do is pass the error to the final Subscriber
206202
// as we don't have the operator available to us
207203
o.onError(e);
@@ -1507,10 +1503,8 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
15071503
// if an unhandled error occurs executing the onSubscribe we will propagate it
15081504
try {
15091505
subscriber.onError(hook.onSubscribeError(e));
1510-
} catch (OnErrorNotImplementedException e2) {
1511-
// special handling when onError is not implemented ... we just rethrow
1512-
throw e2;
15131506
} catch (Throwable e2) {
1507+
Exceptions.throwIfFatal(e2);
15141508
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
15151509
// so we are unable to propagate the error correctly and will just throw
15161510
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
@@ -1596,10 +1590,8 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
15961590
// if an unhandled error occurs executing the onSubscribe we will propagate it
15971591
try {
15981592
subscriber.onError(hook.onSubscribeError(e));
1599-
} catch (OnErrorNotImplementedException e2) {
1600-
// special handling when onError is not implemented ... we just rethrow
1601-
throw e2;
16021593
} catch (Throwable e2) {
1594+
Exceptions.throwIfFatal(e2);
16031595
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
16041596
// so we are unable to propagate the error correctly and will just throw
16051597
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);

src/test/java/rx/exceptions/ExceptionsTest.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222

2323
import org.junit.Test;
2424

25+
import rx.Single;
26+
import rx.SingleSubscriber;
27+
import rx.Subscriber;
2528
import rx.Observable;
2629
import rx.Observer;
2730
import rx.functions.Action1;
@@ -226,4 +229,111 @@ public void onNext(Integer integer) {
226229
}
227230
});
228231
}
232+
233+
@Test(expected = OnErrorFailedException.class)
234+
public void testOnErrorExceptionIsThrownFromSubscribe() {
235+
Observable.create(new Observable.OnSubscribe<Integer>() {
236+
@Override
237+
public void call(Subscriber<? super Integer> s1) {
238+
Observable.create(new Observable.OnSubscribe<Integer>() {
239+
@Override
240+
public void call(Subscriber<? super Integer> s2) {
241+
throw new IllegalArgumentException("original exception");
242+
}
243+
}).subscribe(s1);
244+
}
245+
}
246+
).subscribe(new OnErrorFailedSubscriber());
247+
}
248+
249+
@Test(expected = OnErrorFailedException.class)
250+
public void testOnErrorExceptionIsThrownFromUnsafeSubscribe() {
251+
Observable.create(new Observable.OnSubscribe<Integer>() {
252+
@Override
253+
public void call(Subscriber<? super Integer> s1) {
254+
Observable.create(new Observable.OnSubscribe<Integer>() {
255+
@Override
256+
public void call(Subscriber<? super Integer> s2) {
257+
throw new IllegalArgumentException("original exception");
258+
}
259+
}).unsafeSubscribe(s1);
260+
}
261+
}
262+
).subscribe(new OnErrorFailedSubscriber());
263+
}
264+
265+
@Test(expected = OnErrorFailedException.class)
266+
public void testOnErrorExceptionIsThrownFromSingleDoOnSuccess() throws Exception {
267+
Single.just(1)
268+
.doOnSuccess(new Action1<Integer>() {
269+
@Override
270+
public void call(Integer integer) {
271+
throw new RuntimeException();
272+
}
273+
})
274+
.subscribe(new OnErrorFailedSubscriber());
275+
}
276+
277+
@Test(expected = OnErrorFailedException.class)
278+
public void testOnErrorExceptionIsThrownFromSingleSubscribe() {
279+
Single.create(new Single.OnSubscribe<Integer>() {
280+
@Override
281+
public void call(SingleSubscriber<? super Integer> s1) {
282+
Single.create(new Single.OnSubscribe<Integer>() {
283+
@Override
284+
public void call(SingleSubscriber<? super Integer> s2) {
285+
throw new IllegalArgumentException("original exception");
286+
}
287+
}).subscribe(s1);
288+
}
289+
}
290+
).subscribe(new OnErrorFailedSubscriber());
291+
}
292+
293+
@Test(expected = OnErrorFailedException.class)
294+
public void testOnErrorExceptionIsThrownFromSingleUnsafeSubscribe() {
295+
Single.create(new Single.OnSubscribe<Integer>() {
296+
@Override
297+
public void call(final SingleSubscriber<? super Integer> s1) {
298+
Single.create(new Single.OnSubscribe<Integer>() {
299+
@Override
300+
public void call(SingleSubscriber<? super Integer> s2) {
301+
throw new IllegalArgumentException("original exception");
302+
}
303+
}).unsafeSubscribe(new Subscriber<Integer>() {
304+
305+
@Override
306+
public void onCompleted() {
307+
}
308+
309+
@Override
310+
public void onError(Throwable e) {
311+
s1.onError(e);
312+
}
313+
314+
@Override
315+
public void onNext(Integer v) {
316+
s1.onSuccess(v);
317+
}
318+
319+
});
320+
}
321+
}
322+
).subscribe(new OnErrorFailedSubscriber());
323+
}
324+
325+
private class OnErrorFailedSubscriber extends Subscriber<Integer> {
326+
@Override
327+
public void onCompleted() {
328+
}
329+
330+
@Override
331+
public void onError(Throwable e) {
332+
throw new RuntimeException();
333+
}
334+
335+
@Override
336+
public void onNext(Integer value) {
337+
}
338+
}
229339
}

0 commit comments

Comments
 (0)