Skip to content

Commit 026f93b

Browse files
Unsubscribe eagerly and drop queued onNext to emit onError
The queue will be drained to find and emit the onError.
1 parent f44341a commit 026f93b

File tree

1 file changed

+17
-3
lines changed

1 file changed

+17
-3
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7171

7272
private final RxRingBuffer queue = RxRingBuffer.getSpscInstance();
7373
private boolean completed = false;
74+
private boolean failure = false;
7475

7576
private volatile long requested = 0;
7677
@SuppressWarnings("rawtypes")
@@ -137,7 +138,11 @@ public void onError(final Throwable e) {
137138
if (isUnsubscribed() || completed) {
138139
return;
139140
}
141+
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
142+
unsubscribe();
140143
completed = true;
144+
// mark failure so the polling thread will skip onNext still in the queue
145+
failure = true;
141146
queue.onError(e);
142147
schedule();
143148
}
@@ -166,9 +171,18 @@ private void pollQueue() {
166171
REQUESTED.incrementAndGet(this);
167172
break;
168173
} else {
169-
if (!on.accept(child, o)) {
170-
// non-terminal event so let's increment count
171-
emitted++;
174+
if (failure) {
175+
// completed so we will skip onNext if they exist and only emit terminal events
176+
if (on.isError(o)) {
177+
System.out.println("Error: " + o);
178+
// only emit error
179+
on.accept(child, o);
180+
}
181+
} else {
182+
if (!on.accept(child, o)) {
183+
// non-terminal event so let's increment count
184+
emitted++;
185+
}
172186
}
173187
}
174188
} else {

0 commit comments

Comments
 (0)