Skip to content

Commit f7a59af

Browse files
zsxwingbenjchristensen
authored andcommitted
Fix the race condition in BufferUntilSubscriber
1 parent 0794d9f commit f7a59af

File tree

2 files changed

+162
-103
lines changed

2 files changed

+162
-103
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import org.junit.Assert;
19+
import org.junit.Test;
20+
import rx.Observable;
21+
import rx.functions.Action1;
22+
import rx.functions.Func1;
23+
import rx.schedulers.Schedulers;
24+
import rx.subjects.PublishSubject;
25+
26+
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
32+
public class BufferUntilSubscriberTest {
33+
34+
@Test
35+
public void testIssue1677() throws InterruptedException {
36+
final AtomicLong counter = new AtomicLong();
37+
final Integer[] numbers = new Integer[5000];
38+
for (int i = 0; i < numbers.length; i++)
39+
numbers[i] = i + 1;
40+
final int NITERS = 250;
41+
final CountDownLatch latch = new CountDownLatch(NITERS);
42+
for (int iters = 0; iters < NITERS; iters++) {
43+
final CountDownLatch innerLatch = new CountDownLatch(1);
44+
final PublishSubject s = PublishSubject.create();
45+
final AtomicBoolean completed = new AtomicBoolean();
46+
Observable.from(numbers)
47+
.takeUntil(s)
48+
.window(50)
49+
.flatMap(new Func1<Observable<Integer>, Observable<Integer>>() {
50+
@Override
51+
public Observable<Integer> call(Observable<Integer> integerObservable) {
52+
return integerObservable
53+
.subscribeOn(Schedulers.computation())
54+
.map(new Func1<Integer, Integer>() {
55+
@Override
56+
public Integer call(Integer integer) {
57+
if (integer >= 5 && completed.compareAndSet(false, true)) {
58+
s.onCompleted();
59+
}
60+
// do some work
61+
Math.pow(Math.random(), Math.random());
62+
return integer * 2;
63+
}
64+
});
65+
}
66+
})
67+
.toList()
68+
.doOnNext(new Action1<List<Integer>>() {
69+
@Override
70+
public void call(List<Integer> integers) {
71+
counter.incrementAndGet();
72+
latch.countDown();
73+
innerLatch.countDown();
74+
}
75+
})
76+
.subscribe();
77+
if (!innerLatch.await(30, TimeUnit.SECONDS))
78+
Assert.fail("Failed inner latch wait, iteration " + iters);
79+
}
80+
if (!latch.await(30, TimeUnit.SECONDS))
81+
Assert.fail("Incomplete! Went through " + latch.getCount() + " iterations");
82+
else
83+
Assert.assertEquals(NITERS, counter.get());
84+
}
85+
}

src/main/java/rx/internal/operators/BufferUntilSubscriber.java

+77-103
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2019
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2120

2221
import rx.Observer;
@@ -62,25 +61,22 @@ public static <T> BufferUntilSubscriber<T> create() {
6261

6362
/** The common state. */
6463
static final class State<T> {
65-
/** The first observer or the one which buffers until the first arrives. */
66-
volatile Observer<? super T> observerRef = new BufferedObserver<T>();
67-
/** Allow a single subscriber only. */
68-
volatile int first;
64+
volatile Observer<? super T> observerRef = null;
6965
/** Field updater for observerRef. */
7066
@SuppressWarnings("rawtypes")
7167
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
7268
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
73-
/** Field updater for first. */
74-
@SuppressWarnings("rawtypes")
75-
static final AtomicIntegerFieldUpdater<State> FIRST_UPDATER
76-
= AtomicIntegerFieldUpdater.newUpdater(State.class, "first");
77-
78-
boolean casFirst(int expected, int next) {
79-
return FIRST_UPDATER.compareAndSet(this, expected, next);
80-
}
81-
void setObserverRef(Observer<? super T> o) {
82-
observerRef = o;
69+
70+
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
71+
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
8372
}
73+
74+
Object guard = new Object();
75+
/* protected by guard */
76+
boolean emitting = false;
77+
78+
final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
79+
final NotificationLite<T> nl = NotificationLite.instance();
8480
}
8581

8682
static final class OnSubscribeAction<T> implements OnSubscribe<T> {
@@ -92,122 +88,100 @@ public OnSubscribeAction(State<T> state) {
9288

9389
@Override
9490
public void call(final Subscriber<? super T> s) {
95-
if (state.casFirst(0, 1)) {
96-
final NotificationLite<T> nl = NotificationLite.instance();
97-
// drain queued notifications before subscription
98-
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
99-
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef;
100-
Object o;
101-
while ((o = buffered.buffer.poll()) != null) {
102-
nl.accept(s, o);
103-
}
104-
// register real observer for pass-thru ... and drain any further events received on first notification
105-
state.setObserverRef(new PassThruObserver<T>(s, buffered.buffer, state));
91+
if (state.casObserverRef(null, s)) {
10692
s.add(Subscriptions.create(new Action0() {
10793
@Override
10894
public void call() {
109-
state.setObserverRef(Subscribers.empty());
95+
state.observerRef = Subscribers.empty();
11096
}
11197
}));
98+
boolean win = false;
99+
synchronized (state.guard) {
100+
if (!state.emitting) {
101+
state.emitting = true;
102+
win = true;
103+
}
104+
}
105+
if (win) {
106+
final NotificationLite<T> nl = NotificationLite.instance();
107+
while(true) {
108+
Object o;
109+
while ((o = state.buffer.poll()) != null) {
110+
nl.accept(state.observerRef, o);
111+
}
112+
synchronized (state.guard) {
113+
if (state.buffer.isEmpty()) {
114+
// Although the buffer is empty, there is still a chance
115+
// that further events may be put into the `buffer`.
116+
// `emit(Object v)` should handle it.
117+
state.emitting = false;
118+
break;
119+
}
120+
}
121+
}
122+
}
112123
} else {
113124
s.onError(new IllegalStateException("Only one subscriber allowed!"));
114125
}
115126
}
116127

117128
}
118129
final State<T> state;
119-
130+
131+
private boolean forward = false;
132+
120133
private BufferUntilSubscriber(State<T> state) {
121134
super(new OnSubscribeAction<T>(state));
122135
this.state = state;
123136
}
124137

125-
@Override
126-
public void onCompleted() {
127-
state.observerRef.onCompleted();
128-
}
129-
130-
@Override
131-
public void onError(Throwable e) {
132-
state.observerRef.onError(e);
138+
private void emit(Object v) {
139+
synchronized (state.guard) {
140+
state.buffer.add(v);
141+
if (state.observerRef != null && !state.emitting) {
142+
// Have an observer and nobody is emitting,
143+
// should drain the `buffer`
144+
forward = true;
145+
state.emitting = true;
146+
}
147+
}
148+
if (forward) {
149+
Object o;
150+
while ((o = state.buffer.poll()) != null) {
151+
state.nl.accept(state.observerRef, o);
152+
}
153+
// Because `emit(Object v)` will be called in sequence,
154+
// no event will be put into `buffer` after we drain it.
155+
}
133156
}
134157

135158
@Override
136-
public void onNext(T t) {
137-
state.observerRef.onNext(t);
138-
}
139-
140-
/**
141-
* This is a temporary observer between buffering and the actual that gets into the line of notifications
142-
* from the producer and will drain the queue of any items received during the race of the initial drain and
143-
* switching this.
144-
*
145-
* It will then immediately swap itself out for the actual (after a single notification), but since this is
146-
* now being done on the same producer thread no further buffering will occur.
147-
*/
148-
private static final class PassThruObserver<T> extends Subscriber<T> {
149-
150-
private final Observer<? super T> actual;
151-
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
152-
private final ConcurrentLinkedQueue<Object> buffer;
153-
private final State<T> state;
154-
155-
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer,
156-
State<T> state) {
157-
this.actual = actual;
158-
this.buffer = buffer;
159-
this.state = state;
160-
}
161-
162-
@Override
163-
public void onCompleted() {
164-
drainIfNeededAndSwitchToActual();
165-
actual.onCompleted();
159+
public void onCompleted() {
160+
if (forward) {
161+
state.observerRef.onCompleted();
166162
}
167-
168-
@Override
169-
public void onError(Throwable e) {
170-
drainIfNeededAndSwitchToActual();
171-
actual.onError(e);
163+
else {
164+
emit(state.nl.completed());
172165
}
166+
}
173167

174-
@Override
175-
public void onNext(T t) {
176-
drainIfNeededAndSwitchToActual();
177-
actual.onNext(t);
168+
@Override
169+
public void onError(Throwable e) {
170+
if (forward) {
171+
state.observerRef.onError(e);
178172
}
179-
180-
private void drainIfNeededAndSwitchToActual() {
181-
final NotificationLite<T> nl = NotificationLite.instance();
182-
Object o;
183-
while ((o = buffer.poll()) != null) {
184-
nl.accept(this, o);
185-
}
186-
// now we can safely change over to the actual and get rid of the pass-thru
187-
// but only if not unsubscribed
188-
state.setObserverRef(actual);
173+
else {
174+
emit(state.nl.error(e));
189175
}
190-
191176
}
192177

193-
private static final class BufferedObserver<T> extends Subscriber<T> {
194-
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
195-
private static final NotificationLite<Object> nl = NotificationLite.instance();
196-
197-
@Override
198-
public void onCompleted() {
199-
buffer.add(nl.completed());
200-
}
201-
202-
@Override
203-
public void onError(Throwable e) {
204-
buffer.add(nl.error(e));
178+
@Override
179+
public void onNext(T t) {
180+
if (forward) {
181+
state.observerRef.onNext(t);
205182
}
206-
207-
@Override
208-
public void onNext(T t) {
209-
buffer.add(nl.next(t));
183+
else {
184+
emit(state.nl.next(t));
210185
}
211-
212186
}
213187
}

0 commit comments

Comments
 (0)