Skip to content

Commit 0a8b7bc

Browse files
committed
Merge pull request #3689 from akarnokd/OnErrorDoXFix1x
1.x: unified onErrorX and onExceptionResumeNext and fixed backpressure
2 parents 33f2894 + 99d5c60 commit 0a8b7bc

10 files changed

+172
-345
lines changed

src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6230,7 +6230,7 @@ public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Ob
62306230
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
62316231
*/
62326232
public final Observable<T> onErrorResumeNext(final Observable<? extends T> resumeSequence) {
6233-
return lift(new OperatorOnErrorResumeNextViaObservable<T>(resumeSequence));
6233+
return lift(OperatorOnErrorResumeNextViaFunction.withOther(resumeSequence));
62346234
}
62356235

62366236
/**
@@ -6260,7 +6260,7 @@ public final Observable<T> onErrorResumeNext(final Observable<? extends T> resum
62606260
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
62616261
*/
62626262
public final Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction) {
6263-
return lift(new OperatorOnErrorReturn<T>(resumeFunction));
6263+
return lift(OperatorOnErrorResumeNextViaFunction.withSingle(resumeFunction));
62646264
}
62656265

62666266
/**
@@ -6296,7 +6296,7 @@ public final Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFun
62966296
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
62976297
*/
62986298
public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence) {
6299-
return lift(new OperatorOnExceptionResumeNextViaObservable<T>(resumeSequence));
6299+
return lift(OperatorOnErrorResumeNextViaFunction.withException(resumeSequence));
63006300
}
63016301

63026302
/**

src/main/java/rx/Single.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1411,7 +1411,7 @@ public final Single<T> observeOn(Scheduler scheduler) {
14111411
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
14121412
*/
14131413
public final Single<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction) {
1414-
return lift(new OperatorOnErrorReturn<T>(resumeFunction));
1414+
return lift(OperatorOnErrorResumeNextViaFunction.withSingle(resumeFunction));
14151415
}
14161416

14171417
/**

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,51 @@ public final class OperatorOnErrorResumeNextViaFunction<T> implements Operator<T
4545

4646
final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction;
4747

48+
public static <T> OperatorOnErrorResumeNextViaFunction<T> withSingle(final Func1<Throwable, ? extends T> resumeFunction) {
49+
return new OperatorOnErrorResumeNextViaFunction<T>(new Func1<Throwable, Observable<? extends T>>() {
50+
@Override
51+
public Observable<? extends T> call(Throwable t) {
52+
return Observable.just(resumeFunction.call(t));
53+
}
54+
});
55+
}
56+
57+
public static <T> OperatorOnErrorResumeNextViaFunction<T> withOther(final Observable<? extends T> other) {
58+
return new OperatorOnErrorResumeNextViaFunction<T>(new Func1<Throwable, Observable<? extends T>>() {
59+
@Override
60+
public Observable<? extends T> call(Throwable t) {
61+
return other;
62+
}
63+
});
64+
}
65+
66+
public static <T> OperatorOnErrorResumeNextViaFunction<T> withException(final Observable<? extends T> other) {
67+
return new OperatorOnErrorResumeNextViaFunction<T>(new Func1<Throwable, Observable<? extends T>>() {
68+
@Override
69+
public Observable<? extends T> call(Throwable t) {
70+
if (t instanceof Exception) {
71+
return other;
72+
}
73+
return Observable.error(t);
74+
}
75+
});
76+
}
77+
4878
public OperatorOnErrorResumeNextViaFunction(Func1<Throwable, ? extends Observable<? extends T>> f) {
4979
this.resumeFunction = f;
5080
}
5181

5282
@Override
5383
public Subscriber<? super T> call(final Subscriber<? super T> child) {
5484
final ProducerArbiter pa = new ProducerArbiter();
85+
5586
final SerialSubscription ssub = new SerialSubscription();
87+
5688
Subscriber<T> parent = new Subscriber<T>() {
5789

58-
private boolean done = false;
90+
private boolean done;
91+
92+
long produced;
5993

6094
@Override
6195
public void onCompleted() {
@@ -70,12 +104,13 @@ public void onCompleted() {
70104
public void onError(Throwable e) {
71105
if (done) {
72106
Exceptions.throwIfFatal(e);
107+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
73108
return;
74109
}
75110
done = true;
76111
try {
77-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
78112
unsubscribe();
113+
79114
Subscriber<T> next = new Subscriber<T>() {
80115
@Override
81116
public void onNext(T t) {
@@ -96,7 +131,13 @@ public void setProducer(Producer producer) {
96131
};
97132
ssub.set(next);
98133

134+
long p = produced;
135+
if (p != 0L) {
136+
pa.produced(p);
137+
}
138+
99139
Observable<? extends T> resume = resumeFunction.call(e);
140+
100141
resume.unsafeSubscribe(next);
101142
} catch (Throwable e2) {
102143
Exceptions.throwOrReport(e2, child);
@@ -108,6 +149,7 @@ public void onNext(T t) {
108149
if (done) {
109150
return;
110151
}
152+
produced++;
111153
child.onNext(t);
112154
}
113155

@@ -117,9 +159,11 @@ public void setProducer(final Producer producer) {
117159
}
118160

119161
};
120-
child.add(ssub);
121162
ssub.set(parent);
163+
164+
child.add(ssub);
122165
child.setProducer(pa);
166+
123167
return parent;
124168
}
125169

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java

Lines changed: 0 additions & 104 deletions
This file was deleted.

src/main/java/rx/internal/operators/OperatorOnErrorReturn.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)