Skip to content

Commit bd87ceb

Browse files
Error Handling Unsubscribe and Terminal State
1 parent 63e2b58 commit bd87ceb

6 files changed

+143
-3
lines changed

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.Operator;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.functions.Func1;
2223
import rx.plugins.RxJavaPlugins;
2324

@@ -49,17 +50,29 @@ public OperatorOnErrorResumeNextViaFunction(Func1<Throwable, ? extends Observabl
4950

5051
@Override
5152
public Subscriber<? super T> call(final Subscriber<? super T> child) {
52-
return new Subscriber<T>(child) {
53+
Subscriber<T> parent = new Subscriber<T>() {
5354

55+
private boolean done = false;
56+
5457
@Override
5558
public void onCompleted() {
59+
if (done) {
60+
return;
61+
}
62+
done = true;
5663
child.onCompleted();
5764
}
5865

5966
@Override
6067
public void onError(Throwable e) {
68+
if (done) {
69+
Exceptions.throwIfFatal(e);
70+
return;
71+
}
72+
done = true;
6173
try {
6274
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
75+
unsubscribe();
6376
Observable<? extends T> resume = resumeFunction.call(e);
6477
resume.unsafeSubscribe(child);
6578
} catch (Throwable e2) {
@@ -69,10 +82,15 @@ public void onError(Throwable e) {
6982

7083
@Override
7184
public void onNext(T t) {
85+
if (done) {
86+
return;
87+
}
7288
child.onNext(t);
7389
}
7490

7591
};
92+
child.add(parent);
93+
return parent;
7694
}
7795

7896
}

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java

+16
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.Operator;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.plugins.RxJavaPlugins;
2223

2324
/**
@@ -51,20 +52,35 @@ public OperatorOnErrorResumeNextViaObservable(Observable<? extends T> resumeSequ
5152
public Subscriber<? super T> call(final Subscriber<? super T> child) {
5253
// shared subscription won't work here
5354
Subscriber<T> s = new Subscriber<T>() {
55+
56+
private boolean done = false;
57+
5458
@Override
5559
public void onNext(T t) {
60+
if (done) {
61+
return;
62+
}
5663
child.onNext(t);
5764
}
5865

5966
@Override
6067
public void onError(Throwable e) {
68+
if (done) {
69+
Exceptions.throwIfFatal(e);
70+
return;
71+
}
72+
done = true;
6173
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
6274
unsubscribe();
6375
resumeSequence.unsafeSubscribe(child);
6476
}
6577

6678
@Override
6779
public void onCompleted() {
80+
if (done) {
81+
return;
82+
}
83+
done = true;
6884
child.onCompleted();
6985
}
7086

src/main/java/rx/internal/operators/OperatorOnErrorReturn.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
package rx.internal.operators;
1717

1818
import java.util.Arrays;
19+
1920
import rx.Observable.Operator;
2021
import rx.Subscriber;
2122
import rx.exceptions.CompositeException;
23+
import rx.exceptions.Exceptions;
2224
import rx.functions.Func1;
2325
import rx.plugins.RxJavaPlugins;
2426

@@ -50,7 +52,7 @@ public OperatorOnErrorReturn(Func1<Throwable, ? extends T> resultFunction) {
5052

5153
@Override
5254
public Subscriber<? super T> call(final Subscriber<? super T> child) {
53-
return new Subscriber<T>(child) {
55+
Subscriber<T> parent = new Subscriber<T>() {
5456

5557
private boolean done = false;
5658

@@ -65,13 +67,14 @@ public void onNext(T t) {
6567
@Override
6668
public void onError(Throwable e) {
6769
if (done) {
70+
Exceptions.throwIfFatal(e);
6871
return;
6972
}
7073
done = true;
7174
try {
7275
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
76+
unsubscribe();
7377
T result = resultFunction.call(e);
74-
7578
child.onNext(result);
7679
} catch (Throwable x) {
7780
child.onError(new CompositeException(Arrays.asList(e, x)));
@@ -90,5 +93,7 @@ public void onCompleted() {
9093
}
9194

9295
};
96+
child.add(parent);
97+
return parent;
9398
}
9499
}

src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java

+15
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.Operator;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.plugins.RxJavaPlugins;
2223

2324
/**
@@ -56,13 +57,23 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
5657
// needs to independently unsubscribe so child can continue with the resume
5758
Subscriber<T> s = new Subscriber<T>() {
5859

60+
private boolean done = false;
61+
5962
@Override
6063
public void onNext(T t) {
64+
if (done) {
65+
return;
66+
}
6167
child.onNext(t);
6268
}
6369

6470
@Override
6571
public void onError(Throwable e) {
72+
if (done) {
73+
Exceptions.throwIfFatal(e);
74+
return;
75+
}
76+
done = true;
6677
if (e instanceof Exception) {
6778
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
6879
unsubscribe();
@@ -74,6 +85,10 @@ public void onError(Throwable e) {
7485

7586
@Override
7687
public void onCompleted() {
88+
if (done) {
89+
return;
90+
}
91+
done = true;
7792
child.onCompleted();
7893
}
7994

src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java

+44
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import rx.Subscription;
3636
import rx.functions.Func1;
3737
import rx.observers.TestSubscriber;
38+
import rx.schedulers.Schedulers;
3839

3940
public class OperatorOnErrorResumeNextViaFunctionTest {
4041

@@ -47,6 +48,8 @@ public void testResumeNextWithSynchronousExecution() {
4748
public void call(Subscriber<? super String> observer) {
4849
observer.onNext("one");
4950
observer.onError(new Throwable("injected failure"));
51+
observer.onNext("two");
52+
observer.onNext("three");
5053
}
5154
});
5255

@@ -226,6 +229,47 @@ public Observable<String> call(Throwable t1) {
226229
System.out.println(ts.getOnNextEvents());
227230
ts.assertReceivedOnNext(Arrays.asList("success"));
228231
}
232+
233+
@Test
234+
public void testMapResumeAsyncNext() {
235+
// Trigger multiple failures
236+
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
237+
238+
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
239+
// rx.operator incl onErrorResumeNextViaObservable)
240+
w = w.map(new Func1<String, String>() {
241+
@Override
242+
public String call(String s) {
243+
if ("fail".equals(s))
244+
throw new RuntimeException("Forced Failure");
245+
System.out.println("BadMapper:" + s);
246+
return s;
247+
}
248+
});
249+
250+
Observable<String> observable = w.onErrorResumeNext(new Func1<Throwable, Observable<String>>() {
251+
252+
@Override
253+
public Observable<String> call(Throwable t1) {
254+
return Observable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation());
255+
}
256+
257+
});
258+
259+
@SuppressWarnings("unchecked")
260+
Observer<String> observer = mock(Observer.class);
261+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
262+
observable.subscribe(ts);
263+
ts.awaitTerminalEvent();
264+
265+
verify(observer, Mockito.never()).onError(any(Throwable.class));
266+
verify(observer, times(1)).onCompleted();
267+
verify(observer, times(1)).onNext("one");
268+
verify(observer, Mockito.never()).onNext("two");
269+
verify(observer, Mockito.never()).onNext("three");
270+
verify(observer, times(1)).onNext("twoResume");
271+
verify(observer, times(1)).onNext("threeResume");
272+
}
229273

230274
private static class TestObservable implements Observable.OnSubscribe<String> {
231275

src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java

+42
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import rx.Observer;
3232
import rx.Subscriber;
3333
import rx.functions.Func1;
34+
import rx.observers.TestSubscriber;
35+
import rx.schedulers.Schedulers;
3436

3537
public class OperatorOnErrorReturnTest {
3638

@@ -104,6 +106,46 @@ public String call(Throwable e) {
104106
verify(observer, times(0)).onCompleted();
105107
assertNotNull(capturedException.get());
106108
}
109+
110+
@Test
111+
public void testMapResumeAsyncNext() {
112+
// Trigger multiple failures
113+
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
114+
115+
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
116+
// rx.operator incl onErrorResumeNextViaObservable)
117+
w = w.map(new Func1<String, String>() {
118+
@Override
119+
public String call(String s) {
120+
if ("fail".equals(s))
121+
throw new RuntimeException("Forced Failure");
122+
System.out.println("BadMapper:" + s);
123+
return s;
124+
}
125+
});
126+
127+
Observable<String> observable = w.onErrorReturn(new Func1<Throwable, String>() {
128+
129+
@Override
130+
public String call(Throwable t1) {
131+
return "resume";
132+
}
133+
134+
});
135+
136+
@SuppressWarnings("unchecked")
137+
Observer<String> observer = mock(Observer.class);
138+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
139+
observable.subscribe(ts);
140+
ts.awaitTerminalEvent();
141+
142+
verify(observer, Mockito.never()).onError(any(Throwable.class));
143+
verify(observer, times(1)).onCompleted();
144+
verify(observer, times(1)).onNext("one");
145+
verify(observer, Mockito.never()).onNext("two");
146+
verify(observer, Mockito.never()).onNext("three");
147+
verify(observer, times(1)).onNext("resume");
148+
}
107149

108150
private static class TestObservable implements Observable.OnSubscribe<String> {
109151

0 commit comments

Comments
 (0)