Skip to content

Commit 6557dac

Browse files
Merge pull request #1631 from benjchristensen/doOnEach-error-swallowing
Handle Fatal Exceptions
2 parents 9c44701 + 63dac7b commit 6557dac

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable.Operator;
1919
import rx.Observer;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.exceptions.OnErrorThrowable;
2223

2324
/**
@@ -54,6 +55,8 @@ public void onCompleted() {
5455

5556
@Override
5657
public void onError(Throwable e) {
58+
// need to throwIfFatal since we swallow errors after terminated
59+
Exceptions.throwIfFatal(e);
5760
if (done) {
5861
return;
5962
}

rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java

+35-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.*;
1919
import static org.mockito.Matchers.any;
2020
import static org.mockito.Mockito.never;
2121
import static org.mockito.Mockito.times;
@@ -28,6 +28,8 @@
2828

2929
import rx.Observable;
3030
import rx.Observer;
31+
import rx.Subscriber;
32+
import rx.exceptions.OnErrorNotImplementedException;
3133
import rx.functions.Action1;
3234
import rx.functions.Func1;
3335

@@ -121,7 +123,7 @@ public void call(String s) {
121123
@Test
122124
public void testIssue1451Case1() {
123125
// https://github.com/Netflix/RxJava/issues/1451
124-
int[] nums = {1, 2, 3};
126+
int[] nums = { 1, 2, 3 };
125127
final AtomicInteger count = new AtomicInteger();
126128
for (final int n : nums) {
127129
Observable
@@ -147,7 +149,7 @@ public void call(List<Boolean> booleans) {
147149
@Test
148150
public void testIssue1451Case2() {
149151
// https://github.com/Netflix/RxJava/issues/1451
150-
int[] nums = {1, 2, 3};
152+
int[] nums = { 1, 2, 3 };
151153
final AtomicInteger count = new AtomicInteger();
152154
for (final int n : nums) {
153155
Observable
@@ -169,4 +171,34 @@ public void call(List<Boolean> booleans) {
169171
}
170172
assertEquals(nums.length, count.get());
171173
}
174+
175+
@Test
176+
public void testFatalError() {
177+
try {
178+
Observable.just(1, 2, 3)
179+
.flatMap(new Func1<Integer, Observable<?>>() {
180+
@Override
181+
public Observable<?> call(Integer integer) {
182+
return Observable.create(new Observable.OnSubscribe<Object>() {
183+
@Override
184+
public void call(Subscriber<Object> o) {
185+
throw new NullPointerException("Test NPE");
186+
}
187+
});
188+
}
189+
})
190+
.doOnNext(new Action1<Object>() {
191+
@Override
192+
public void call(Object o) {
193+
System.out.println("Won't come here");
194+
}
195+
})
196+
.subscribe();
197+
fail("should have thrown an exception");
198+
} catch (OnErrorNotImplementedException e) {
199+
assertTrue(e.getCause() instanceof NullPointerException);
200+
assertEquals(e.getCause().getMessage(), "Test NPE");
201+
System.out.println("Received exception: " + e);
202+
}
203+
}
172204
}

0 commit comments

Comments
 (0)