Skip to content

Commit fc94562

Browse files
ObserveOn Cleanup
- make code easier to understand with some refactoring of the pollQueue loop - add a unit test to ensure correct behavior with a hot Observable source
1 parent 84c88bd commit fc94562

File tree

2 files changed

+46
-22
lines changed

2 files changed

+46
-22
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import rx.exceptions.MissingBackpressureException;
2727
import rx.functions.Action0;
2828
import rx.internal.util.RxRingBuffer;
29-
import rx.internal.util.SynchronizedSubscription;
3029
import rx.schedulers.ImmediateScheduler;
3130
import rx.schedulers.TrampolineScheduler;
3231

@@ -100,9 +99,8 @@ public void request(long n) {
10099
add(scheduledUnsubscribe);
101100
child.add(recursiveScheduler);
102101
child.add(this);
103-
104102
}
105-
103+
106104
@Override
107105
public void onStart() {
108106
// signal that this is an async operator capable of receiving this many
@@ -160,9 +158,16 @@ public void call() {
160158
}
161159
}
162160

161+
// only execute this from schedule()
163162
private void pollQueue() {
164163
int emitted = 0;
165-
while (true) {
164+
do {
165+
/*
166+
* Set to 1 otherwise it could have grown very large while in the last poll loop
167+
* and then we can end up looping all those times again here before exiting even once we've drained
168+
*/
169+
COUNTER_UPDATER.set(this, 1);
170+
166171
while (!scheduledUnsubscribe.isUnsubscribed()) {
167172
if (REQUESTED.getAndDecrement(this) != 0) {
168173
Object o = queue.poll();
@@ -174,15 +179,18 @@ private void pollQueue() {
174179
if (failure) {
175180
// completed so we will skip onNext if they exist and only emit terminal events
176181
if (on.isError(o)) {
177-
System.out.println("Error: " + o);
178182
// only emit error
179183
on.accept(child, o);
184+
// TODO this could hit the requested limit again ... and is skipping values
185+
// so the request count is broken ... it needs to purge the queue
186+
// or modify the requested amount so it will loop through everything
180187
}
181188
} else {
182189
if (!on.accept(child, o)) {
183190
// non-terminal event so let's increment count
184191
emitted++;
185192
}
193+
186194
}
187195
}
188196
} else {
@@ -191,22 +199,11 @@ private void pollQueue() {
191199
break;
192200
}
193201
}
194-
long c = COUNTER_UPDATER.decrementAndGet(this);
195-
if (c <= 0) {
196-
// request the number of items that we emitted in this poll loop
197-
if (emitted > 0) {
198-
request(emitted);
199-
}
200-
break;
201-
} else {
202-
/*
203-
* Set down to 1 and then iterate again.
204-
* we lower it to 1 otherwise it could have grown very large while in the last poll loop
205-
* and then we can end up looping all those times again here before existing even once we've drained
206-
*/
207-
COUNTER_UPDATER.set(this, 1);
208-
// we now loop again, and if anything tries scheduling again after this it will increment and cause us to loop again after
209-
}
202+
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);
203+
204+
// request the number of items that we emitted in this poll loop
205+
if (emitted > 0) {
206+
request(emitted);
210207
}
211208
}
212209
}

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
391391
inOrder.verify(o, never()).onNext(anyInt());
392392
inOrder.verify(o, never()).onCompleted();
393393
}
394-
394+
395395
@Test
396396
public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() {
397397
final TestScheduler testScheduler = new TestScheduler();
@@ -647,6 +647,33 @@ public void onNext(Long t) {
647647
assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1);
648648
// we should emit the error without emitting the full buffer size
649649
assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE);
650+
}
651+
652+
/**
653+
* Make sure we get a MissingBackpressureException propagated through when we have a fast temporal (hot) producer.
654+
*/
655+
@Test
656+
public void testHotOperatorBackpressure() {
657+
TestSubscriber<String> ts = new TestSubscriber<String>();
658+
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
659+
.observeOn(Schedulers.computation())
660+
.map(new Func1<Long, String>() {
650661

662+
@Override
663+
public String call(Long t1) {
664+
System.out.println(t1);
665+
try {
666+
Thread.sleep(100);
667+
} catch (InterruptedException e) {
668+
}
669+
return t1 + " slow value";
670+
}
671+
672+
}).subscribe(ts);
673+
674+
ts.awaitTerminalEvent();
675+
System.out.println("Errors: " + ts.getOnErrorEvents());
676+
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
651677
}
678+
652679
}

0 commit comments

Comments
 (0)