Skip to content

Commit 3161910

Browse files
Merge pull request #1091 from benjchristensen/unsafeSubscribe-error-handling
Handle Thrown Errors with UnsafeSubscribe
2 parents 2713bdb + 6b6d214 commit 3161910

File tree

3 files changed

+57
-1
lines changed

3 files changed

+57
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6231,7 +6231,15 @@ public void onNext(T t) {
62316231
* @return Subscription which is the Subscriber passed in
62326232
*/
62336233
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
6234-
onSubscribe.call(subscriber);
6234+
try {
6235+
onSubscribe.call(subscriber);
6236+
} catch (Throwable e) {
6237+
if (e instanceof OnErrorNotImplementedException) {
6238+
throw (OnErrorNotImplementedException) e;
6239+
}
6240+
// handle broken contracts: https://github.com/Netflix/RxJava/issues/1090
6241+
subscriber.onError(e);
6242+
}
62356243
return subscriber;
62366244
}
62376245

rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,24 @@ public void testError2() {
323323
verify(stringObserver, times(0)).onNext("eight");
324324
verify(stringObserver, times(0)).onNext("nine");
325325
}
326+
327+
@Test
328+
public void testThrownErrorHandling() {
329+
TestSubscriber<String> ts = new TestSubscriber<String>();
330+
Observable<String> o1 = Observable.create(new OnSubscribe<String>() {
331+
332+
@Override
333+
public void call(Subscriber<? super String> s) {
334+
throw new RuntimeException("fail");
335+
}
336+
337+
});
338+
339+
Observable.merge(o1, o1).subscribe(ts);
340+
ts.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
341+
ts.assertTerminalEvent();
342+
System.out.println("Error: " + ts.getOnErrorEvents());
343+
}
326344

327345
private static class TestSynchronousObservable implements Observable.OnSubscribeFunc<String> {
328346

rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,36 @@ public void call(
7777
assertEquals(0, observer.getOnErrorEvents().size());
7878
assertEquals(1, observer.getOnCompletedEvents().size());
7979
}
80+
81+
@Test
82+
public void testThrownErrorHandling() {
83+
TestSubscriber<String> ts = new TestSubscriber<String>();
84+
Observable.create(new OnSubscribe<String>() {
85+
86+
@Override
87+
public void call(Subscriber<? super String> s) {
88+
throw new RuntimeException("fail");
89+
}
90+
91+
}).subscribeOn(Schedulers.computation()).subscribe(ts);
92+
ts.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
93+
ts.assertTerminalEvent();
94+
}
95+
96+
@Test
97+
public void testOnError() {
98+
TestSubscriber<String> ts = new TestSubscriber<String>();
99+
Observable.create(new OnSubscribe<String>() {
100+
101+
@Override
102+
public void call(Subscriber<? super String> s) {
103+
s.onError(new RuntimeException("fail"));
104+
}
105+
106+
}).subscribeOn(Schedulers.computation()).subscribe(ts);
107+
ts.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
108+
ts.assertTerminalEvent();
109+
}
80110

81111
public static class SlowScheduler extends Scheduler {
82112
final Scheduler actual;

0 commit comments

Comments
 (0)