Skip to content

Commit f51cd52

Browse files
authored
2.x: fix SpscLAQ nepotism, FlowableRefCountTest.testRefCountAsync flaky (#5507)
1 parent 09df08a commit f51cd52

File tree

3 files changed

+45
-8
lines changed

3 files changed

+45
-8
lines changed

src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,11 @@ private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Obje
114114
soElement(curr, calcDirectOffset(curr.length() - 1), next);
115115
}
116116
@SuppressWarnings("unchecked")
117-
private AtomicReferenceArray<Object> lvNext(AtomicReferenceArray<Object> curr) {
118-
return (AtomicReferenceArray<Object>)lvElement(curr, calcDirectOffset(curr.length() - 1));
117+
private AtomicReferenceArray<Object> lvNextBufferAndUnlink(AtomicReferenceArray<Object> curr, int nextIndex) {
118+
int nextOffset = calcDirectOffset(nextIndex);
119+
AtomicReferenceArray<Object> nextBuffer = (AtomicReferenceArray<Object>)lvElement(curr, nextOffset);
120+
soElement(curr, nextOffset, null); // Avoid GC nepotism
121+
return nextBuffer;
119122
}
120123
/**
121124
* {@inheritDoc}
@@ -138,7 +141,7 @@ public T poll() {
138141
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
139142
return (T) e;
140143
} else if (isNextBuffer) {
141-
return newBufferPoll(lvNext(buffer), index, mask);
144+
return newBufferPoll(lvNextBufferAndUnlink(buffer, mask + 1), index, mask);
142145
}
143146

144147
return null;
@@ -164,7 +167,7 @@ public T peek() {
164167
final int offset = calcWrappedOffset(index, mask);
165168
final Object e = lvElement(buffer, offset);// LoadLoad
166169
if (e == HAS_NEXT) {
167-
return newBufferPeek(lvNext(buffer), index, mask);
170+
return newBufferPeek(lvNextBufferAndUnlink(buffer, mask + 1), index, mask);
168171
}
169172

170173
return (T) e;

src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class FlowableRefCountTest {
4242
public void testRefCountAsync() {
4343
final AtomicInteger subscribeCount = new AtomicInteger();
4444
final AtomicInteger nextCount = new AtomicInteger();
45-
Flowable<Long> r = Flowable.interval(0, 5, TimeUnit.MILLISECONDS)
45+
Flowable<Long> r = Flowable.interval(0, 20, TimeUnit.MILLISECONDS)
4646
.doOnSubscribe(new Consumer<Subscription>() {
4747
@Override
4848
public void accept(Subscription s) {
@@ -67,12 +67,27 @@ public void accept(Long l) {
6767

6868
Disposable s2 = r.subscribe();
6969

70-
// give time to emit
7170
try {
72-
Thread.sleep(52);
71+
Thread.sleep(10);
7372
} catch (InterruptedException e) {
7473
}
7574

75+
for (;;) {
76+
int a = nextCount.get();
77+
int b = receivedCount.get();
78+
if (a > 10 && a < 20 && a == b) {
79+
break;
80+
}
81+
if (a >= 20) {
82+
break;
83+
}
84+
try {
85+
Thread.sleep(20);
86+
} catch (InterruptedException e) {
87+
}
88+
}
89+
// give time to emit
90+
7691
// now unsubscribe
7792
s2.dispose(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
7893
s1.dispose();

src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import static org.junit.Assert.*;
2222

23-
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.*;
2424

2525
import org.junit.Test;
2626

@@ -155,4 +155,23 @@ public void run() {
155155
t1.join();
156156
t2.join();
157157
}
158+
159+
@Test
160+
public void spscLinkedArrayQueueNoNepotism() {
161+
SpscLinkedArrayQueue<Integer> q = new SpscLinkedArrayQueue<Integer>(16);
162+
163+
AtomicReferenceArray<Object> ara = q.producerBuffer;
164+
165+
for (int i = 0; i < 20; i++) {
166+
q.offer(i);
167+
}
168+
169+
assertNotNull(ara.get(16));
170+
171+
for (int i = 0; i < 20; i++) {
172+
assertEquals(i, q.poll().intValue());
173+
}
174+
175+
assertNull(ara.get(16));
176+
}
158177
}

0 commit comments

Comments
 (0)