Skip to content

Commit 292bcfb

Browse files
Merge pull request #1672 from benjchristensen/merge-0_20_4
Merge 0.20.4 Fixes
2 parents 68a61e2 + b93f8bb commit 292bcfb

10 files changed

+177
-5
lines changed

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.Operator;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.functions.Func1;
2223
import rx.plugins.RxJavaPlugins;
2324

@@ -49,17 +50,29 @@ public OperatorOnErrorResumeNextViaFunction(Func1<Throwable, ? extends Observabl
4950

5051
@Override
5152
public Subscriber<? super T> call(final Subscriber<? super T> child) {
52-
return new Subscriber<T>(child) {
53+
Subscriber<T> parent = new Subscriber<T>() {
5354

55+
private boolean done = false;
56+
5457
@Override
5558
public void onCompleted() {
59+
if (done) {
60+
return;
61+
}
62+
done = true;
5663
child.onCompleted();
5764
}
5865

5966
@Override
6067
public void onError(Throwable e) {
68+
if (done) {
69+
Exceptions.throwIfFatal(e);
70+
return;
71+
}
72+
done = true;
6173
try {
6274
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
75+
unsubscribe();
6376
Observable<? extends T> resume = resumeFunction.call(e);
6477
resume.unsafeSubscribe(child);
6578
} catch (Throwable e2) {
@@ -69,10 +82,15 @@ public void onError(Throwable e) {
6982

7083
@Override
7184
public void onNext(T t) {
85+
if (done) {
86+
return;
87+
}
7288
child.onNext(t);
7389
}
7490

7591
};
92+
child.add(parent);
93+
return parent;
7694
}
7795

7896
}

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java

+16
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.Operator;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.plugins.RxJavaPlugins;
2223

2324
/**
@@ -51,20 +52,35 @@ public OperatorOnErrorResumeNextViaObservable(Observable<? extends T> resumeSequ
5152
public Subscriber<? super T> call(final Subscriber<? super T> child) {
5253
// shared subscription won't work here
5354
Subscriber<T> s = new Subscriber<T>() {
55+
56+
private boolean done = false;
57+
5458
@Override
5559
public void onNext(T t) {
60+
if (done) {
61+
return;
62+
}
5663
child.onNext(t);
5764
}
5865

5966
@Override
6067
public void onError(Throwable e) {
68+
if (done) {
69+
Exceptions.throwIfFatal(e);
70+
return;
71+
}
72+
done = true;
6173
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
6274
unsubscribe();
6375
resumeSequence.unsafeSubscribe(child);
6476
}
6577

6678
@Override
6779
public void onCompleted() {
80+
if (done) {
81+
return;
82+
}
83+
done = true;
6884
child.onCompleted();
6985
}
7086

src/main/java/rx/internal/operators/OperatorOnErrorReturn.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
package rx.internal.operators;
1717

1818
import java.util.Arrays;
19+
1920
import rx.Observable.Operator;
2021
import rx.Subscriber;
2122
import rx.exceptions.CompositeException;
23+
import rx.exceptions.Exceptions;
2224
import rx.functions.Func1;
2325
import rx.plugins.RxJavaPlugins;
2426

@@ -50,19 +52,29 @@ public OperatorOnErrorReturn(Func1<Throwable, ? extends T> resultFunction) {
5052

5153
@Override
5254
public Subscriber<? super T> call(final Subscriber<? super T> child) {
53-
return new Subscriber<T>(child) {
55+
Subscriber<T> parent = new Subscriber<T>() {
56+
57+
private boolean done = false;
5458

5559
@Override
5660
public void onNext(T t) {
61+
if (done) {
62+
return;
63+
}
5764
child.onNext(t);
5865
}
5966

6067
@Override
6168
public void onError(Throwable e) {
69+
if (done) {
70+
Exceptions.throwIfFatal(e);
71+
return;
72+
}
73+
done = true;
6274
try {
6375
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
76+
unsubscribe();
6477
T result = resultFunction.call(e);
65-
6678
child.onNext(result);
6779
} catch (Throwable x) {
6880
child.onError(new CompositeException(Arrays.asList(e, x)));
@@ -73,9 +85,15 @@ public void onError(Throwable e) {
7385

7486
@Override
7587
public void onCompleted() {
88+
if (done) {
89+
return;
90+
}
91+
done = true;
7692
child.onCompleted();
7793
}
7894

7995
};
96+
child.add(parent);
97+
return parent;
8098
}
8199
}

src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java

+15
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.Operator;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.plugins.RxJavaPlugins;
2223

2324
/**
@@ -56,13 +57,23 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
5657
// needs to independently unsubscribe so child can continue with the resume
5758
Subscriber<T> s = new Subscriber<T>() {
5859

60+
private boolean done = false;
61+
5962
@Override
6063
public void onNext(T t) {
64+
if (done) {
65+
return;
66+
}
6167
child.onNext(t);
6268
}
6369

6470
@Override
6571
public void onError(Throwable e) {
72+
if (done) {
73+
Exceptions.throwIfFatal(e);
74+
return;
75+
}
76+
done = true;
6677
if (e instanceof Exception) {
6778
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
6879
unsubscribe();
@@ -74,6 +85,10 @@ public void onError(Throwable e) {
7485

7586
@Override
7687
public void onCompleted() {
88+
if (done) {
89+
return;
90+
}
91+
done = true;
7792
child.onCompleted();
7893
}
7994

src/main/java/rx/internal/operators/OperatorScan.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void setProducer(final Producer producer) {
129129
@Override
130130
public void request(long n) {
131131
if (once.compareAndSet(false, true)) {
132-
if (initialValue == NO_INITIAL_VALUE) {
132+
if (initialValue == NO_INITIAL_VALUE || n == Long.MAX_VALUE) {
133133
producer.request(n);
134134
} else {
135135
producer.request(n - 1);

src/main/java/rx/internal/operators/OperatorSwitch.java

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ private static final class SwitchSubscriber<T> extends Subscriber<Observable<? e
6262
volatile boolean infinite = false;
6363

6464
public SwitchSubscriber(Subscriber<? super T> child) {
65+
super(child);
6566
s = new SerializedSubscriber<T>(child);
6667
ssub = new SerialSubscription();
6768
child.add(ssub);

src/main/java/rx/internal/operators/OperatorTakeUntil.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.Operator;
2020
import rx.Subscriber;
21+
import rx.observers.SerializedSubscriber;
2122

2223
/**
2324
* Returns an Observable that emits the items from the source Observable until another Observable
@@ -35,7 +36,7 @@ public OperatorTakeUntil(final Observable<? extends E> other) {
3536

3637
@Override
3738
public Subscriber<? super T> call(final Subscriber<? super T> child) {
38-
final Subscriber<T> parent = new Subscriber<T>(child) {
39+
final Subscriber<T> parent = new SerializedSubscriber<T>(child) {
3940

4041
@Override
4142
public void onCompleted() {

src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java

+44
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import rx.Subscription;
3636
import rx.functions.Func1;
3737
import rx.observers.TestSubscriber;
38+
import rx.schedulers.Schedulers;
3839

3940
public class OperatorOnErrorResumeNextViaFunctionTest {
4041

@@ -47,6 +48,8 @@ public void testResumeNextWithSynchronousExecution() {
4748
public void call(Subscriber<? super String> observer) {
4849
observer.onNext("one");
4950
observer.onError(new Throwable("injected failure"));
51+
observer.onNext("two");
52+
observer.onNext("three");
5053
}
5154
});
5255

@@ -226,6 +229,47 @@ public Observable<String> call(Throwable t1) {
226229
System.out.println(ts.getOnNextEvents());
227230
ts.assertReceivedOnNext(Arrays.asList("success"));
228231
}
232+
233+
@Test
234+
public void testMapResumeAsyncNext() {
235+
// Trigger multiple failures
236+
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
237+
238+
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
239+
// rx.operator incl onErrorResumeNextViaObservable)
240+
w = w.map(new Func1<String, String>() {
241+
@Override
242+
public String call(String s) {
243+
if ("fail".equals(s))
244+
throw new RuntimeException("Forced Failure");
245+
System.out.println("BadMapper:" + s);
246+
return s;
247+
}
248+
});
249+
250+
Observable<String> observable = w.onErrorResumeNext(new Func1<Throwable, Observable<String>>() {
251+
252+
@Override
253+
public Observable<String> call(Throwable t1) {
254+
return Observable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation());
255+
}
256+
257+
});
258+
259+
@SuppressWarnings("unchecked")
260+
Observer<String> observer = mock(Observer.class);
261+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
262+
observable.subscribe(ts);
263+
ts.awaitTerminalEvent();
264+
265+
verify(observer, Mockito.never()).onError(any(Throwable.class));
266+
verify(observer, times(1)).onCompleted();
267+
verify(observer, times(1)).onNext("one");
268+
verify(observer, Mockito.never()).onNext("two");
269+
verify(observer, Mockito.never()).onNext("three");
270+
verify(observer, times(1)).onNext("twoResume");
271+
verify(observer, times(1)).onNext("threeResume");
272+
}
229273

230274
private static class TestObservable implements Observable.OnSubscribe<String> {
231275

src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java

+42
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import rx.Observer;
3232
import rx.Subscriber;
3333
import rx.functions.Func1;
34+
import rx.observers.TestSubscriber;
35+
import rx.schedulers.Schedulers;
3436

3537
public class OperatorOnErrorReturnTest {
3638

@@ -104,6 +106,46 @@ public String call(Throwable e) {
104106
verify(observer, times(0)).onCompleted();
105107
assertNotNull(capturedException.get());
106108
}
109+
110+
@Test
111+
public void testMapResumeAsyncNext() {
112+
// Trigger multiple failures
113+
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");
114+
115+
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
116+
// rx.operator incl onErrorResumeNextViaObservable)
117+
w = w.map(new Func1<String, String>() {
118+
@Override
119+
public String call(String s) {
120+
if ("fail".equals(s))
121+
throw new RuntimeException("Forced Failure");
122+
System.out.println("BadMapper:" + s);
123+
return s;
124+
}
125+
});
126+
127+
Observable<String> observable = w.onErrorReturn(new Func1<Throwable, String>() {
128+
129+
@Override
130+
public String call(Throwable t1) {
131+
return "resume";
132+
}
133+
134+
});
135+
136+
@SuppressWarnings("unchecked")
137+
Observer<String> observer = mock(Observer.class);
138+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
139+
observable.subscribe(ts);
140+
ts.awaitTerminalEvent();
141+
142+
verify(observer, Mockito.never()).onError(any(Throwable.class));
143+
verify(observer, times(1)).onCompleted();
144+
verify(observer, times(1)).onNext("one");
145+
verify(observer, Mockito.never()).onNext("two");
146+
verify(observer, Mockito.never()).onNext("three");
147+
verify(observer, times(1)).onNext("resume");
148+
}
107149

108150
private static class TestObservable implements Observable.OnSubscribe<String> {
109151

src/test/java/rx/internal/operators/OperatorSwitchTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertTrue;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.anyString;
2021
import static org.mockito.Mockito.inOrder;
@@ -25,6 +26,7 @@
2526

2627
import java.util.Arrays;
2728
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2830

2931
import org.junit.Before;
3032
import org.junit.Test;
@@ -513,4 +515,19 @@ public void onNext(String s) {
513515
testSubscriber.assertNoErrors();
514516
testSubscriber.assertTerminalEvent();
515517
}
518+
519+
@Test
520+
public void testUnsubscribe() {
521+
final AtomicBoolean isUnsubscribed = new AtomicBoolean();
522+
Observable.switchOnNext(
523+
Observable.create(new Observable.OnSubscribe<Observable<Integer>>() {
524+
@Override
525+
public void call(final Subscriber<? super Observable<Integer>> subscriber) {
526+
subscriber.onNext(Observable.just(1));
527+
isUnsubscribed.set(subscriber.isUnsubscribed());
528+
}
529+
})
530+
).take(1).subscribe();
531+
assertTrue("Switch doesn't propagate 'unsubscribe'", isUnsubscribed.get());
532+
}
516533
}

0 commit comments

Comments
 (0)