Skip to content

Commit 50b2d8d

Browse files
ObserveOn Error Propagation
- errors were not being propagated if backpressure was exerted (requested==0) - terminal events do not need to be requested so it now ignores the requested count to emit onError
1 parent fc94562 commit 50b2d8d

File tree

2 files changed

+62
-22
lines changed

2 files changed

+62
-22
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ public void request(long n) {
9696
}
9797

9898
});
99-
add(scheduledUnsubscribe);
10099
child.add(recursiveScheduler);
101100
child.add(this);
102101
}
@@ -169,34 +168,34 @@ private void pollQueue() {
169168
COUNTER_UPDATER.set(this, 1);
170169

171170
while (!scheduledUnsubscribe.isUnsubscribed()) {
172-
if (REQUESTED.getAndDecrement(this) != 0) {
171+
if (failure) {
172+
// special handling to short-circuit an error propagation
173173
Object o = queue.poll();
174-
if (o == null) {
175-
// nothing in queue
176-
REQUESTED.incrementAndGet(this);
177-
break;
178-
} else {
179-
if (failure) {
180-
// completed so we will skip onNext if they exist and only emit terminal events
181-
if (on.isError(o)) {
182-
// only emit error
183-
on.accept(child, o);
184-
// TODO this could hit the requested limit again ... and is skipping values
185-
// so the request count is broken ... it needs to purge the queue
186-
// or modify the requested amount so it will loop through everything
187-
}
174+
// completed so we will skip onNext if they exist and only emit terminal events
175+
if (on.isError(o)) {
176+
// only emit error
177+
on.accept(child, o);
178+
// we have emitted a terminal event so return (exit the loop we're in)
179+
return;
180+
}
181+
} else {
182+
if (REQUESTED.getAndDecrement(this) != 0) {
183+
Object o = queue.poll();
184+
if (o == null) {
185+
// nothing in queue
186+
REQUESTED.incrementAndGet(this);
187+
break;
188188
} else {
189189
if (!on.accept(child, o)) {
190190
// non-terminal event so let's increment count
191191
emitted++;
192192
}
193-
194193
}
194+
} else {
195+
// we hit the end ... so increment back to 0 again
196+
REQUESTED.incrementAndGet(this);
197+
break;
195198
}
196-
} else {
197-
// we hit the end ... so increment back to 0 again
198-
REQUESTED.incrementAndGet(this);
199-
break;
200199
}
201200
}
202201
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import java.util.concurrent.atomic.AtomicInteger;
3535
import java.util.concurrent.atomic.AtomicLong;
3636

37+
import org.junit.Ignore;
3738
import org.junit.Test;
3839
import org.mockito.InOrder;
3940

41+
import rx.Notification;
4042
import rx.Observable;
4143
import rx.Observable.OnSubscribe;
4244
import rx.Observer;
@@ -48,6 +50,7 @@
4850
import rx.functions.Action0;
4951
import rx.functions.Action1;
5052
import rx.functions.Func1;
53+
import rx.functions.Func2;
5154
import rx.internal.util.RxRingBuffer;
5255
import rx.observers.TestSubscriber;
5356
import rx.schedulers.Schedulers;
@@ -655,7 +658,7 @@ public void onNext(Long t) {
655658
@Test
656659
public void testHotOperatorBackpressure() {
657660
TestSubscriber<String> ts = new TestSubscriber<String>();
658-
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
661+
Observable.timer(0, 1, TimeUnit.MICROSECONDS)
659662
.observeOn(Schedulers.computation())
660663
.map(new Func1<Long, String>() {
661664

@@ -673,6 +676,44 @@ public String call(Long t1) {
673676

674677
ts.awaitTerminalEvent();
675678
System.out.println("Errors: " + ts.getOnErrorEvents());
679+
assertEquals(1, ts.getOnErrorEvents().size());
680+
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
681+
}
682+
683+
@Test
684+
public void testErrorPropagatesWhenNoOutstandingRequests() {
685+
Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MICROSECONDS)
686+
.doOnEach(new Action1<Notification<? super Long>>() {
687+
688+
@Override
689+
public void call(Notification<? super Long> n) {
690+
// System.out.println("BEFORE " + n);
691+
}
692+
693+
})
694+
.observeOn(Schedulers.newThread())
695+
.doOnEach(new Action1<Notification<? super Long>>() {
696+
697+
@Override
698+
public void call(Notification<? super Long> n) {
699+
// System.out.println("AFTER " + n);
700+
}
701+
702+
});
703+
704+
TestSubscriber<Long> ts = new TestSubscriber<Long>();
705+
706+
Observable.combineLatest(timer, Observable.<Integer> never(), new Func2<Long, Integer, Long>() {
707+
708+
@Override
709+
public Long call(Long t1, Integer t2) {
710+
return t1;
711+
}
712+
713+
}).take(RxRingBuffer.SIZE * 2).subscribe(ts);
714+
715+
ts.awaitTerminalEvent();
716+
assertEquals(1, ts.getOnErrorEvents().size());
676717
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
677718
}
678719

0 commit comments

Comments
 (0)