Skip to content

Commit 80f7f30

Browse files
Merge pull request #1728 from benjchristensen/observeOnErrorPropagation
ObserveOn Error Propagation
2 parents 0d74d98 + 50b2d8d commit 80f7f30

File tree

2 files changed

+103
-39
lines changed

2 files changed

+103
-39
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

+34-38
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import rx.exceptions.MissingBackpressureException;
2727
import rx.functions.Action0;
2828
import rx.internal.util.RxRingBuffer;
29-
import rx.internal.util.SynchronizedSubscription;
3029
import rx.schedulers.ImmediateScheduler;
3130
import rx.schedulers.TrampolineScheduler;
3231

@@ -97,12 +96,10 @@ public void request(long n) {
9796
}
9897

9998
});
100-
add(scheduledUnsubscribe);
10199
child.add(recursiveScheduler);
102100
child.add(this);
103-
104101
}
105-
102+
106103
@Override
107104
public void onStart() {
108105
// signal that this is an async operator capable of receiving this many
@@ -160,53 +157,52 @@ public void call() {
160157
}
161158
}
162159

160+
// only execute this from schedule()
163161
private void pollQueue() {
164162
int emitted = 0;
165-
while (true) {
163+
do {
164+
/*
165+
* Set to 1 otherwise it could have grown very large while in the last poll loop
166+
* and then we can end up looping all those times again here before exiting even once we've drained
167+
*/
168+
COUNTER_UPDATER.set(this, 1);
169+
166170
while (!scheduledUnsubscribe.isUnsubscribed()) {
167-
if (REQUESTED.getAndDecrement(this) != 0) {
171+
if (failure) {
172+
// special handling to short-circuit an error propagation
168173
Object o = queue.poll();
169-
if (o == null) {
170-
// nothing in queue
171-
REQUESTED.incrementAndGet(this);
172-
break;
173-
} else {
174-
if (failure) {
175-
// completed so we will skip onNext if they exist and only emit terminal events
176-
if (on.isError(o)) {
177-
System.out.println("Error: " + o);
178-
// only emit error
179-
on.accept(child, o);
180-
}
174+
// completed so we will skip onNext if they exist and only emit terminal events
175+
if (on.isError(o)) {
176+
// only emit error
177+
on.accept(child, o);
178+
// we have emitted a terminal event so return (exit the loop we're in)
179+
return;
180+
}
181+
} else {
182+
if (REQUESTED.getAndDecrement(this) != 0) {
183+
Object o = queue.poll();
184+
if (o == null) {
185+
// nothing in queue
186+
REQUESTED.incrementAndGet(this);
187+
break;
181188
} else {
182189
if (!on.accept(child, o)) {
183190
// non-terminal event so let's increment count
184191
emitted++;
185192
}
186193
}
194+
} else {
195+
// we hit the end ... so increment back to 0 again
196+
REQUESTED.incrementAndGet(this);
197+
break;
187198
}
188-
} else {
189-
// we hit the end ... so increment back to 0 again
190-
REQUESTED.incrementAndGet(this);
191-
break;
192199
}
193200
}
194-
long c = COUNTER_UPDATER.decrementAndGet(this);
195-
if (c <= 0) {
196-
// request the number of items that we emitted in this poll loop
197-
if (emitted > 0) {
198-
request(emitted);
199-
}
200-
break;
201-
} else {
202-
/*
203-
* Set down to 1 and then iterate again.
204-
* we lower it to 1 otherwise it could have grown very large while in the last poll loop
205-
* and then we can end up looping all those times again here before existing even once we've drained
206-
*/
207-
COUNTER_UPDATER.set(this, 1);
208-
// we now loop again, and if anything tries scheduling again after this it will increment and cause us to loop again after
209-
}
201+
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);
202+
203+
// request the number of items that we emitted in this poll loop
204+
if (emitted > 0) {
205+
request(emitted);
210206
}
211207
}
212208
}

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

+69-1
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import java.util.concurrent.atomic.AtomicInteger;
3535
import java.util.concurrent.atomic.AtomicLong;
3636

37+
import org.junit.Ignore;
3738
import org.junit.Test;
3839
import org.mockito.InOrder;
3940

41+
import rx.Notification;
4042
import rx.Observable;
4143
import rx.Observable.OnSubscribe;
4244
import rx.Observer;
@@ -48,6 +50,7 @@
4850
import rx.functions.Action0;
4951
import rx.functions.Action1;
5052
import rx.functions.Func1;
53+
import rx.functions.Func2;
5154
import rx.internal.util.RxRingBuffer;
5255
import rx.observers.TestSubscriber;
5356
import rx.schedulers.Schedulers;
@@ -391,7 +394,7 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
391394
inOrder.verify(o, never()).onNext(anyInt());
392395
inOrder.verify(o, never()).onCompleted();
393396
}
394-
397+
395398
@Test
396399
public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() {
397400
final TestScheduler testScheduler = new TestScheduler();
@@ -647,6 +650,71 @@ public void onNext(Long t) {
647650
assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1);
648651
// we should emit the error without emitting the full buffer size
649652
assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE);
653+
}
654+
655+
/**
656+
* Make sure we get a MissingBackpressureException propagated through when we have a fast temporal (hot) producer.
657+
*/
658+
@Test
659+
public void testHotOperatorBackpressure() {
660+
TestSubscriber<String> ts = new TestSubscriber<String>();
661+
Observable.timer(0, 1, TimeUnit.MICROSECONDS)
662+
.observeOn(Schedulers.computation())
663+
.map(new Func1<Long, String>() {
664+
665+
@Override
666+
public String call(Long t1) {
667+
System.out.println(t1);
668+
try {
669+
Thread.sleep(100);
670+
} catch (InterruptedException e) {
671+
}
672+
return t1 + " slow value";
673+
}
650674

675+
}).subscribe(ts);
676+
677+
ts.awaitTerminalEvent();
678+
System.out.println("Errors: " + ts.getOnErrorEvents());
679+
assertEquals(1, ts.getOnErrorEvents().size());
680+
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
651681
}
682+
683+
@Test
684+
public void testErrorPropagatesWhenNoOutstandingRequests() {
685+
Observable<Long> timer = Observable.timer(0, 1, TimeUnit.MICROSECONDS)
686+
.doOnEach(new Action1<Notification<? super Long>>() {
687+
688+
@Override
689+
public void call(Notification<? super Long> n) {
690+
// System.out.println("BEFORE " + n);
691+
}
692+
693+
})
694+
.observeOn(Schedulers.newThread())
695+
.doOnEach(new Action1<Notification<? super Long>>() {
696+
697+
@Override
698+
public void call(Notification<? super Long> n) {
699+
// System.out.println("AFTER " + n);
700+
}
701+
702+
});
703+
704+
TestSubscriber<Long> ts = new TestSubscriber<Long>();
705+
706+
Observable.combineLatest(timer, Observable.<Integer> never(), new Func2<Long, Integer, Long>() {
707+
708+
@Override
709+
public Long call(Long t1, Integer t2) {
710+
return t1;
711+
}
712+
713+
}).take(RxRingBuffer.SIZE * 2).subscribe(ts);
714+
715+
ts.awaitTerminalEvent();
716+
assertEquals(1, ts.getOnErrorEvents().size());
717+
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
718+
}
719+
652720
}

0 commit comments

Comments
 (0)