Skip to content

Commit 4e31a5f

Browse files
committed
Merge pull request #3644 from akarnokd/SyncOnSubscribeErrorMgmt1x
1.x: fix SyncOnSubscribe not signalling onError if the generator crashes
2 parents fed8d7b + 05d8c63 commit 4e31a5f

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

src/main/java/rx/observables/SyncOnSubscribe.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Subscriber;
2525
import rx.Subscription;
2626
import rx.annotations.Experimental;
27+
import rx.exceptions.Exceptions;
2728
import rx.functions.Action0;
2829
import rx.functions.Action1;
2930
import rx.functions.Action2;
@@ -53,7 +54,16 @@ public abstract class SyncOnSubscribe<S, T> implements OnSubscribe<T> {
5354
*/
5455
@Override
5556
public final void call(final Subscriber<? super T> subscriber) {
56-
S state = generateState();
57+
S state;
58+
59+
try {
60+
state = generateState();
61+
} catch (Throwable e) {
62+
Exceptions.throwIfFatal(e);
63+
subscriber.onError(e);
64+
return;
65+
}
66+
5767
SubscriptionProducer<S, T> p = new SubscriptionProducer<S, T>(subscriber, this, state);
5868
subscriber.add(p);
5969
subscriber.setProducer(p);
@@ -363,7 +373,12 @@ private boolean tryUnsubscribe() {
363373
}
364374

365375
private void doUnsubscribe() {
366-
parent.onUnsubscribe(state);
376+
try {
377+
parent.onUnsubscribe(state);
378+
} catch (Throwable e) {
379+
Exceptions.throwIfFatal(e);
380+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
381+
}
367382
}
368383

369384
@Override

src/test/java/rx/observables/SyncOnSubscribeTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,4 +989,27 @@ public Object call() throws Exception {
989989
if (exec != null) exec.shutdownNow();
990990
}
991991
}
992+
993+
@Test
994+
public void testStateThrows() {
995+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
996+
997+
SyncOnSubscribe.<Object, Object>createSingleState(
998+
new Func0<Object>() {
999+
@Override
1000+
public Object call() {
1001+
throw new TestException();
1002+
}
1003+
}
1004+
, new Action2<Object, Observer<Object>>() {
1005+
@Override
1006+
public void call(Object s, Observer<? super Object> o) {
1007+
1008+
}
1009+
}).call(ts);
1010+
1011+
ts.assertNoValues();
1012+
ts.assertError(TestException.class);
1013+
ts.assertNotCompleted();
1014+
}
9921015
}

0 commit comments

Comments
 (0)