Skip to content

Commit 67e463f

Browse files
Merge pull request #1683 from benjchristensen/observeOn-error
ObserveOn Error Handling
2 parents e44e7ec + 1d8adce commit 67e463f

File tree

3 files changed

+65
-4
lines changed

3 files changed

+65
-4
lines changed

src/main/java/rx/internal/operators/NotificationLite.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public OnErrorSentinel(Throwable e) {
7676
}
7777

7878
public String toString() {
79-
return "Notification=>Error:" + e.getMessage();
79+
return "Notification=>Error:" + e;
8080
}
8181
}
8282

src/main/java/rx/internal/operators/OperatorObserveOn.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7171

7272
private final RxRingBuffer queue = RxRingBuffer.getSpscInstance();
7373
private boolean completed = false;
74+
private boolean failure = false;
7475

7576
private volatile long requested = 0;
7677
@SuppressWarnings("rawtypes")
@@ -137,7 +138,11 @@ public void onError(final Throwable e) {
137138
if (isUnsubscribed() || completed) {
138139
return;
139140
}
141+
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
142+
unsubscribe();
140143
completed = true;
144+
// mark failure so the polling thread will skip onNext still in the queue
145+
failure = true;
141146
queue.onError(e);
142147
schedule();
143148
}
@@ -166,9 +171,18 @@ private void pollQueue() {
166171
REQUESTED.incrementAndGet(this);
167172
break;
168173
} else {
169-
if (!on.accept(child, o)) {
170-
// non-terminal event so let's increment count
171-
emitted++;
174+
if (failure) {
175+
// completed so we will skip onNext if they exist and only emit terminal events
176+
if (on.isError(o)) {
177+
System.out.println("Error: " + o);
178+
// only emit error
179+
on.accept(child, o);
180+
}
181+
} else {
182+
if (!on.accept(child, o)) {
183+
// non-terminal event so let's increment count
184+
emitted++;
185+
}
172186
}
173187
}
174188
} else {

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

+47
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import rx.observers.TestSubscriber;
5353
import rx.schedulers.Schedulers;
5454
import rx.schedulers.TestScheduler;
55+
import rx.subjects.PublishSubject;
5556

5657
public class OperatorObserveOnTest {
5758

@@ -602,4 +603,50 @@ public void testAsyncChild() {
602603
ts.awaitTerminalEvent();
603604
ts.assertNoErrors();
604605
}
606+
607+
@Test
608+
public void testOnErrorCutsAheadOfOnNext() {
609+
final PublishSubject<Long> subject = PublishSubject.create();
610+
611+
final AtomicLong counter = new AtomicLong();
612+
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {
613+
614+
@Override
615+
public void onCompleted() {
616+
617+
}
618+
619+
@Override
620+
public void onError(Throwable e) {
621+
622+
}
623+
624+
@Override
625+
public void onNext(Long t) {
626+
// simulate slow consumer to force backpressure failure
627+
try {
628+
Thread.sleep(1);
629+
} catch (InterruptedException e) {
630+
}
631+
}
632+
633+
});
634+
subject.observeOn(Schedulers.computation()).subscribe(ts);
635+
636+
// this will blow up with backpressure
637+
while (counter.get() < 102400) {
638+
subject.onNext(counter.get());
639+
counter.incrementAndGet();
640+
}
641+
642+
ts.awaitTerminalEvent();
643+
assertEquals(1, ts.getOnErrorEvents().size());
644+
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
645+
// assert that the values are sequential, that cutting in didn't allow skipping some but emitting others.
646+
// example [0, 1, 2] not [0, 1, 4]
647+
assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1);
648+
// we should emit the error without emitting the full buffer size
649+
assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE);
650+
651+
}
605652
}

0 commit comments

Comments
 (0)