Skip to content

Commit 1d8adce

Browse files
Assert onError cuts ahead of onNext
... and doesn't allow intermittent onNext via a race of draining the queue and emitting (the draining thread must be where the skipping happens, not the producer thread).
1 parent 026f93b commit 1d8adce

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

+47
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import rx.observers.TestSubscriber;
5353
import rx.schedulers.Schedulers;
5454
import rx.schedulers.TestScheduler;
55+
import rx.subjects.PublishSubject;
5556

5657
public class OperatorObserveOnTest {
5758

@@ -602,4 +603,50 @@ public void testAsyncChild() {
602603
ts.awaitTerminalEvent();
603604
ts.assertNoErrors();
604605
}
606+
607+
@Test
608+
public void testOnErrorCutsAheadOfOnNext() {
609+
final PublishSubject<Long> subject = PublishSubject.create();
610+
611+
final AtomicLong counter = new AtomicLong();
612+
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {
613+
614+
@Override
615+
public void onCompleted() {
616+
617+
}
618+
619+
@Override
620+
public void onError(Throwable e) {
621+
622+
}
623+
624+
@Override
625+
public void onNext(Long t) {
626+
// simulate slow consumer to force backpressure failure
627+
try {
628+
Thread.sleep(1);
629+
} catch (InterruptedException e) {
630+
}
631+
}
632+
633+
});
634+
subject.observeOn(Schedulers.computation()).subscribe(ts);
635+
636+
// this will blow up with backpressure
637+
while (counter.get() < 102400) {
638+
subject.onNext(counter.get());
639+
counter.incrementAndGet();
640+
}
641+
642+
ts.awaitTerminalEvent();
643+
assertEquals(1, ts.getOnErrorEvents().size());
644+
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
645+
// assert that the values are sequential, that cutting in didn't allow skipping some but emitting others.
646+
// example [0, 1, 2] not [0, 1, 4]
647+
assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1);
648+
// we should emit the error without emitting the full buffer size
649+
assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE);
650+
651+
}
605652
}

0 commit comments

Comments
 (0)