Skip to content

Commit e4101d6

Browse files
Merge pull request #1714 from benjchristensen/merge-20
Merge 0.20.5 into 1.x
2 parents 67e463f + 5a381a2 commit e4101d6

File tree

7 files changed

+250
-145
lines changed

7 files changed

+250
-145
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import org.junit.Assert;
19+
import org.junit.Test;
20+
import rx.Observable;
21+
import rx.functions.Action1;
22+
import rx.functions.Func1;
23+
import rx.schedulers.Schedulers;
24+
import rx.subjects.PublishSubject;
25+
26+
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
32+
public class BufferUntilSubscriberTest {
33+
34+
@Test
35+
public void testIssue1677() throws InterruptedException {
36+
final AtomicLong counter = new AtomicLong();
37+
final Integer[] numbers = new Integer[5000];
38+
for (int i = 0; i < numbers.length; i++)
39+
numbers[i] = i + 1;
40+
final int NITERS = 250;
41+
final CountDownLatch latch = new CountDownLatch(NITERS);
42+
for (int iters = 0; iters < NITERS; iters++) {
43+
final CountDownLatch innerLatch = new CountDownLatch(1);
44+
final PublishSubject s = PublishSubject.create();
45+
final AtomicBoolean completed = new AtomicBoolean();
46+
Observable.from(numbers)
47+
.takeUntil(s)
48+
.window(50)
49+
.flatMap(new Func1<Observable<Integer>, Observable<Integer>>() {
50+
@Override
51+
public Observable<Integer> call(Observable<Integer> integerObservable) {
52+
return integerObservable
53+
.subscribeOn(Schedulers.computation())
54+
.map(new Func1<Integer, Integer>() {
55+
@Override
56+
public Integer call(Integer integer) {
57+
if (integer >= 5 && completed.compareAndSet(false, true)) {
58+
s.onCompleted();
59+
}
60+
// do some work
61+
Math.pow(Math.random(), Math.random());
62+
return integer * 2;
63+
}
64+
});
65+
}
66+
})
67+
.toList()
68+
.doOnNext(new Action1<List<Integer>>() {
69+
@Override
70+
public void call(List<Integer> integers) {
71+
counter.incrementAndGet();
72+
latch.countDown();
73+
innerLatch.countDown();
74+
}
75+
})
76+
.subscribe();
77+
if (!innerLatch.await(30, TimeUnit.SECONDS))
78+
Assert.fail("Failed inner latch wait, iteration " + iters);
79+
}
80+
if (!latch.await(30, TimeUnit.SECONDS))
81+
Assert.fail("Incomplete! Went through " + latch.getCount() + " iterations");
82+
else
83+
Assert.assertEquals(NITERS, counter.get());
84+
}
85+
}

src/main/java/rx/Observable.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,17 @@ public void call(Subscriber<? super R> o) {
174174
* @return the source Observable, transformed by the transformer function
175175
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
176176
*/
177-
public <R> Observable<R> compose(Transformer<? super T, R> transformer) {
178-
return transformer.call(this);
177+
@SuppressWarnings("unchecked")
178+
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
179+
// Casting to Observable<R> is type-safe because we know Observable is covariant.
180+
return (Observable<R>) transformer.call(this);
179181
}
180182

181183
/**
182184
* Transformer function used by {@link #compose}.
183185
* @warn more complete description needed
184186
*/
185-
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<R>> {
187+
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>> {
186188
// cover for generics insanity
187189
}
188190

src/main/java/rx/internal/operators/BufferUntilSubscriber.java

+81-103
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2019
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2120

2221
import rx.Observer;
2322
import rx.Subscriber;
2423
import rx.functions.Action0;
24+
import rx.observers.EmptyObserver;
2525
import rx.observers.Subscribers;
2626
import rx.subjects.Subject;
2727
import rx.subscriptions.Subscriptions;
@@ -51,6 +51,9 @@
5151
*/
5252
public class BufferUntilSubscriber<T> extends Subject<T, T> {
5353

54+
@SuppressWarnings("rawtypes")
55+
private final static Observer EMPTY_OBSERVER = new EmptyObserver();
56+
5457
/**
5558
* @warn create() undescribed
5659
* @return
@@ -62,25 +65,22 @@ public static <T> BufferUntilSubscriber<T> create() {
6265

6366
/** The common state. */
6467
static final class State<T> {
65-
/** The first observer or the one which buffers until the first arrives. */
66-
volatile Observer<? super T> observerRef = new BufferedObserver<T>();
67-
/** Allow a single subscriber only. */
68-
volatile int first;
68+
volatile Observer<? super T> observerRef = null;
6969
/** Field updater for observerRef. */
7070
@SuppressWarnings("rawtypes")
7171
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
7272
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
73-
/** Field updater for first. */
74-
@SuppressWarnings("rawtypes")
75-
static final AtomicIntegerFieldUpdater<State> FIRST_UPDATER
76-
= AtomicIntegerFieldUpdater.newUpdater(State.class, "first");
77-
78-
boolean casFirst(int expected, int next) {
79-
return FIRST_UPDATER.compareAndSet(this, expected, next);
80-
}
81-
void setObserverRef(Observer<? super T> o) {
82-
observerRef = o;
73+
74+
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
75+
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
8376
}
77+
78+
Object guard = new Object();
79+
/* protected by guard */
80+
boolean emitting = false;
81+
82+
final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
83+
final NotificationLite<T> nl = NotificationLite.instance();
8484
}
8585

8686
static final class OnSubscribeAction<T> implements OnSubscribe<T> {
@@ -92,122 +92,100 @@ public OnSubscribeAction(State<T> state) {
9292

9393
@Override
9494
public void call(final Subscriber<? super T> s) {
95-
if (state.casFirst(0, 1)) {
96-
final NotificationLite<T> nl = NotificationLite.instance();
97-
// drain queued notifications before subscription
98-
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
99-
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef;
100-
Object o;
101-
while ((o = buffered.buffer.poll()) != null) {
102-
nl.accept(s, o);
103-
}
104-
// register real observer for pass-thru ... and drain any further events received on first notification
105-
state.setObserverRef(new PassThruObserver<T>(s, buffered.buffer, state));
95+
if (state.casObserverRef(null, s)) {
10696
s.add(Subscriptions.create(new Action0() {
10797
@Override
10898
public void call() {
109-
state.setObserverRef(Subscribers.empty());
99+
state.observerRef = EMPTY_OBSERVER;
110100
}
111101
}));
102+
boolean win = false;
103+
synchronized (state.guard) {
104+
if (!state.emitting) {
105+
state.emitting = true;
106+
win = true;
107+
}
108+
}
109+
if (win) {
110+
final NotificationLite<T> nl = NotificationLite.instance();
111+
while(true) {
112+
Object o;
113+
while ((o = state.buffer.poll()) != null) {
114+
nl.accept(state.observerRef, o);
115+
}
116+
synchronized (state.guard) {
117+
if (state.buffer.isEmpty()) {
118+
// Although the buffer is empty, there is still a chance
119+
// that further events may be put into the `buffer`.
120+
// `emit(Object v)` should handle it.
121+
state.emitting = false;
122+
break;
123+
}
124+
}
125+
}
126+
}
112127
} else {
113128
s.onError(new IllegalStateException("Only one subscriber allowed!"));
114129
}
115130
}
116131

117132
}
118133
final State<T> state;
119-
134+
135+
private boolean forward = false;
136+
120137
private BufferUntilSubscriber(State<T> state) {
121138
super(new OnSubscribeAction<T>(state));
122139
this.state = state;
123140
}
124141

125-
@Override
126-
public void onCompleted() {
127-
state.observerRef.onCompleted();
128-
}
129-
130-
@Override
131-
public void onError(Throwable e) {
132-
state.observerRef.onError(e);
142+
private void emit(Object v) {
143+
synchronized (state.guard) {
144+
state.buffer.add(v);
145+
if (state.observerRef != null && !state.emitting) {
146+
// Have an observer and nobody is emitting,
147+
// should drain the `buffer`
148+
forward = true;
149+
state.emitting = true;
150+
}
151+
}
152+
if (forward) {
153+
Object o;
154+
while ((o = state.buffer.poll()) != null) {
155+
state.nl.accept(state.observerRef, o);
156+
}
157+
// Because `emit(Object v)` will be called in sequence,
158+
// no event will be put into `buffer` after we drain it.
159+
}
133160
}
134161

135162
@Override
136-
public void onNext(T t) {
137-
state.observerRef.onNext(t);
138-
}
139-
140-
/**
141-
* This is a temporary observer between buffering and the actual that gets into the line of notifications
142-
* from the producer and will drain the queue of any items received during the race of the initial drain and
143-
* switching this.
144-
*
145-
* It will then immediately swap itself out for the actual (after a single notification), but since this is
146-
* now being done on the same producer thread no further buffering will occur.
147-
*/
148-
private static final class PassThruObserver<T> extends Subscriber<T> {
149-
150-
private final Observer<? super T> actual;
151-
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
152-
private final ConcurrentLinkedQueue<Object> buffer;
153-
private final State<T> state;
154-
155-
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer,
156-
State<T> state) {
157-
this.actual = actual;
158-
this.buffer = buffer;
159-
this.state = state;
163+
public void onCompleted() {
164+
if (forward) {
165+
state.observerRef.onCompleted();
160166
}
161-
162-
@Override
163-
public void onCompleted() {
164-
drainIfNeededAndSwitchToActual();
165-
actual.onCompleted();
167+
else {
168+
emit(state.nl.completed());
166169
}
170+
}
167171

168-
@Override
169-
public void onError(Throwable e) {
170-
drainIfNeededAndSwitchToActual();
171-
actual.onError(e);
172+
@Override
173+
public void onError(Throwable e) {
174+
if (forward) {
175+
state.observerRef.onError(e);
172176
}
173-
174-
@Override
175-
public void onNext(T t) {
176-
drainIfNeededAndSwitchToActual();
177-
actual.onNext(t);
177+
else {
178+
emit(state.nl.error(e));
178179
}
179-
180-
private void drainIfNeededAndSwitchToActual() {
181-
final NotificationLite<T> nl = NotificationLite.instance();
182-
Object o;
183-
while ((o = buffer.poll()) != null) {
184-
nl.accept(this, o);
185-
}
186-
// now we can safely change over to the actual and get rid of the pass-thru
187-
// but only if not unsubscribed
188-
state.setObserverRef(actual);
189-
}
190-
191180
}
192181

193-
private static final class BufferedObserver<T> extends Subscriber<T> {
194-
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
195-
private static final NotificationLite<Object> nl = NotificationLite.instance();
196-
197-
@Override
198-
public void onCompleted() {
199-
buffer.add(nl.completed());
200-
}
201-
202-
@Override
203-
public void onError(Throwable e) {
204-
buffer.add(nl.error(e));
182+
@Override
183+
public void onNext(T t) {
184+
if (forward) {
185+
state.observerRef.onNext(t);
205186
}
206-
207-
@Override
208-
public void onNext(T t) {
209-
buffer.add(nl.next(t));
187+
else {
188+
emit(state.nl.next(t));
210189
}
211-
212190
}
213191
}

src/main/java/rx/internal/operators/OperatorMerge.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ public void onError(Throwable e) {
409409
boolean sendOnComplete = false;
410410
synchronized (this) {
411411
wip--;
412-
if (wip == 0 && completed) {
412+
if ((wip == 0 && completed) || (wip < 0)) {
413413
sendOnComplete = true;
414414
}
415415
}

src/main/java/rx/internal/operators/OperatorTakeUntil.java

+1-18
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,7 @@ public OperatorTakeUntil(final Observable<? extends E> other) {
3636

3737
@Override
3838
public Subscriber<? super T> call(final Subscriber<? super T> child) {
39-
final Subscriber<T> parent = new SerializedSubscriber<T>(child) {
40-
41-
@Override
42-
public void onCompleted() {
43-
child.onCompleted();
44-
}
45-
46-
@Override
47-
public void onError(Throwable e) {
48-
child.onError(e);
49-
}
50-
51-
@Override
52-
public void onNext(T t) {
53-
child.onNext(t);
54-
}
55-
56-
};
39+
final Subscriber<T> parent = new SerializedSubscriber<T>(child);
5740

5841
other.unsafeSubscribe(new Subscriber<E>(child) {
5942

0 commit comments

Comments
 (0)