Skip to content

Commit 3261a49

Browse files
authored
1.x: fix Spsc queues reporting not empty but then poll() returns null (#4005)
1 parent 67b7be0 commit 3261a49

File tree

5 files changed

+43
-16
lines changed

5 files changed

+43
-16
lines changed

src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ else if (null != lvElement(buffer, offset)){
6464
return false;
6565
}
6666
}
67-
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
6867
soElement(buffer, offset, e); // StoreStore
68+
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
6969
return true;
7070
}
7171

@@ -79,8 +79,8 @@ public E poll() {
7979
if (null == e) {
8080
return null;
8181
}
82-
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
8382
soElement(lElementBuffer, offset, null);// StoreStore
83+
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
8484
return e;
8585
}
8686

src/main/java/rx/internal/util/atomic/SpscLinkedArrayQueue.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ public final boolean offer(final T e) {
9090
}
9191

9292
private boolean writeToQueue(final AtomicReferenceArray<Object> buffer, final T e, final long index, final int offset) {
93-
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
9493
soElement(buffer, offset, e);// StoreStore
94+
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
9595
return true;
9696
}
9797

@@ -101,11 +101,11 @@ private void resize(final AtomicReferenceArray<Object> oldBuffer, final long cur
101101
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
102102
producerBuffer = newBuffer;
103103
producerLookAhead = currIndex + mask - 1;
104-
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
105104
soElement(newBuffer, offset, e);// StoreStore
106105
soNext(oldBuffer, newBuffer);
107106
soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
108107
// inserted
108+
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
109109
}
110110

111111
private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next) {
@@ -131,8 +131,8 @@ public final T poll() {
131131
final Object e = lvElement(buffer, offset);// LoadLoad
132132
boolean isNextBuffer = e == HAS_NEXT;
133133
if (null != e && !isNextBuffer) {
134-
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
135134
soElement(buffer, offset, null);// StoreStore
135+
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
136136
return (T) e;
137137
} else if (isNextBuffer) {
138138
return newBufferPoll(lvNext(buffer), index, mask);
@@ -149,8 +149,8 @@ private T newBufferPoll(AtomicReferenceArray<Object> nextBuffer, final long inde
149149
if (null == n) {
150150
return null;
151151
} else {
152-
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
153152
soElement(nextBuffer, offsetInNew, null);// StoreStore
153+
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
154154
return n;
155155
}
156156
}
@@ -330,8 +330,8 @@ public boolean offer(T first, T second) {
330330
if (null == lvElement(buffer, pi)) {
331331
pi = calcWrappedOffset(p, m);
332332
soElement(buffer, pi + 1, second);
333-
soProducerIndex(p + 2);
334333
soElement(buffer, pi, first);
334+
soProducerIndex(p + 2);
335335
} else {
336336
final int capacity = buffer.length();
337337
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
@@ -342,9 +342,9 @@ public boolean offer(T first, T second) {
342342
soElement(newBuffer, pi, first);
343343
soNext(buffer, newBuffer);
344344

345-
soProducerIndex(p + 2);// this ensures correctness on 32bit platforms
346-
347345
soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is
346+
347+
soProducerIndex(p + 2);// this ensures correctness on 32bit platforms
348348
}
349349

350350
return true;

src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ public boolean offer(final E e) {
110110
if (null != lvElement(lElementBuffer, offset)){
111111
return false;
112112
}
113-
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
114113
soElement(lElementBuffer, offset, e); // StoreStore
114+
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
115115
return true;
116116
}
117117

@@ -130,8 +130,8 @@ public E poll() {
130130
if (null == e) {
131131
return null;
132132
}
133-
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
134133
soElement(lElementBuffer, offset, null);// StoreStore
134+
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
135135
return e;
136136
}
137137

src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ public final boolean offer(final E e) {
132132
}
133133

134134
private boolean writeToQueue(final E[] buffer, final E e, final long index, final long offset) {
135-
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
136135
soElement(buffer, offset, e);// StoreStore
136+
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
137137
return true;
138138
}
139139

@@ -144,11 +144,11 @@ private void resize(final E[] oldBuffer, final long currIndex, final long offset
144144
final E[] newBuffer = (E[]) new Object[capacity];
145145
producerBuffer = newBuffer;
146146
producerLookAhead = currIndex + mask - 1;
147-
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
148147
soElement(newBuffer, offset, e);// StoreStore
149148
soNext(oldBuffer, newBuffer);
150149
soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
151150
// inserted
151+
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
152152
}
153153

154154
private void soNext(E[] curr, E[] next) {
@@ -174,8 +174,8 @@ public final E poll() {
174174
final Object e = lvElement(buffer, offset);// LoadLoad
175175
boolean isNextBuffer = e == HAS_NEXT;
176176
if (null != e && !isNextBuffer) {
177-
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
178177
soElement(buffer, offset, null);// StoreStore
178+
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
179179
return (E) e;
180180
} else if (isNextBuffer) {
181181
return newBufferPoll(lvNext(buffer), index, mask);
@@ -192,8 +192,8 @@ private E newBufferPoll(E[] nextBuffer, final long index, final long mask) {
192192
if (null == n) {
193193
return null;
194194
} else {
195-
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
196195
soElement(nextBuffer, offsetInNew, null);// StoreStore
196+
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
197197
return n;
198198
}
199199
}

src/test/java/rx/internal/operators/OperatorSwitchTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.lang.ref.WeakReference;
2323
import java.util.*;
2424
import java.util.concurrent.*;
25-
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.*;
2626

2727
import org.junit.*;
2828
import org.mockito.InOrder;
@@ -32,6 +32,7 @@
3232
import rx.Observer;
3333
import rx.exceptions.*;
3434
import rx.functions.*;
35+
import rx.internal.util.UtilityFunctions;
3536
import rx.observers.TestSubscriber;
3637
import rx.schedulers.*;
3738
import rx.subjects.PublishSubject;
@@ -880,4 +881,30 @@ public void call(Throwable e) {
880881
}
881882
}
882883
}
884+
885+
@Test
886+
public void asyncInner() throws Throwable {
887+
for (int i = 0; i < 100; i++) {
888+
889+
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
890+
891+
Observable.just(Observable.range(1, 1000 * 1000).subscribeOn(Schedulers.computation()))
892+
.switchMap(UtilityFunctions.<Observable<Integer>>identity())
893+
.observeOn(Schedulers.computation())
894+
.ignoreElements()
895+
.timeout(5, TimeUnit.SECONDS)
896+
.toBlocking()
897+
.subscribe(Actions.empty(), new Action1<Throwable>() {
898+
@Override
899+
public void call(Throwable e) {
900+
error.set(e);
901+
}
902+
});
903+
904+
Throwable ex = error.get();
905+
if (ex != null) {
906+
throw ex;
907+
}
908+
}
909+
}
883910
}

0 commit comments

Comments
 (0)