Skip to content

Commit c63fa2e

Browse files
committed
Merge pull request #3488 from markrietveld/1.x
1.x Remove all instances of Atomic*FieldUpdater
2 parents 9bc1987 + d01cd06 commit c63fa2e

28 files changed

+223
-543
lines changed

src/main/java/rx/internal/operators/BlockingOperatorLatest.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.Iterator;
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.Semaphore;
21-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21+
import java.util.concurrent.atomic.AtomicReference;
2222

2323
import rx.Notification;
2424
import rx.Observable;
@@ -59,15 +59,11 @@ public Iterator<T> iterator() {
5959
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
6060
final Semaphore notify = new Semaphore(0);
6161
// observer's notification
62-
volatile Notification<? extends T> value;
63-
/** Updater for the value field. */
64-
@SuppressWarnings("rawtypes")
65-
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Notification> REFERENCE_UPDATER
66-
= AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value");
62+
final AtomicReference<Notification<? extends T>> value = new AtomicReference<Notification<? extends T>>();
6763

6864
@Override
6965
public void onNext(Notification<? extends T> args) {
70-
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
66+
boolean wasntAvailable = value.getAndSet(args) == null;
7167
if (wasntAvailable) {
7268
notify.release();
7369
}
@@ -103,7 +99,7 @@ public boolean hasNext() {
10399
}
104100

105101
@SuppressWarnings("unchecked")
106-
Notification<? extends T> n = REFERENCE_UPDATER.getAndSet(this, null);
102+
Notification<? extends T> n = value.getAndSet(null);
107103
iNotif = n;
108104
if (iNotif.isOnError()) {
109105
throw Exceptions.propagate(iNotif.getThrowable());

src/main/java/rx/internal/operators/BlockingOperatorNext.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.ArrayBlockingQueue;
2121
import java.util.concurrent.BlockingQueue;
22-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22+
import java.util.concurrent.atomic.AtomicInteger;
2323

2424
import rx.Notification;
2525
import rx.Observable;
@@ -147,11 +147,7 @@ public void remove() {
147147

148148
private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
149149
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
150-
@SuppressWarnings("unused")
151-
volatile int waiting;
152-
@SuppressWarnings("rawtypes")
153-
static final AtomicIntegerFieldUpdater<NextObserver> WAITING_UPDATER
154-
= AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting");
150+
final AtomicInteger waiting = new AtomicInteger();
155151

156152
@Override
157153
public void onCompleted() {
@@ -166,7 +162,7 @@ public void onError(Throwable e) {
166162
@Override
167163
public void onNext(Notification<? extends T> args) {
168164

169-
if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) {
165+
if (waiting.getAndSet(0) == 1 || !args.isOnNext()) {
170166
Notification<? extends T> toOffer = args;
171167
while (!buf.offer(toOffer)) {
172168
Notification<? extends T> concurrentItem = buf.poll();
@@ -185,7 +181,7 @@ public Notification<? extends T> takeNext() throws InterruptedException {
185181
return buf.take();
186182
}
187183
void setWaiting(int value) {
188-
waiting = value;
184+
waiting.set(value);
189185
}
190186
}
191187
}

src/main/java/rx/internal/operators/BufferUntilSubscriber.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19+
import java.util.concurrent.atomic.AtomicReference;
2020

2121
import rx.Observer;
2222
import rx.Subscriber;
@@ -59,15 +59,9 @@ public static <T> BufferUntilSubscriber<T> create() {
5959
}
6060

6161
/** The common state. */
62-
static final class State<T> {
63-
volatile Observer<? super T> observerRef = null;
64-
/** Field updater for observerRef. */
65-
@SuppressWarnings("rawtypes")
66-
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
67-
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
68-
62+
static final class State<T> extends AtomicReference<Observer<? super T>> {
6963
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
70-
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
64+
return compareAndSet(expected, next);
7165
}
7266

7367
final Object guard = new Object();
@@ -92,7 +86,7 @@ public void call(final Subscriber<? super T> s) {
9286
@SuppressWarnings("unchecked")
9387
@Override
9488
public void call() {
95-
state.observerRef = EMPTY_OBSERVER;
89+
state.set(EMPTY_OBSERVER);
9690
}
9791
}));
9892
boolean win = false;
@@ -107,7 +101,7 @@ public void call() {
107101
while(true) {
108102
Object o;
109103
while ((o = state.buffer.poll()) != null) {
110-
nl.accept(state.observerRef, o);
104+
nl.accept(state.get(), o);
111105
}
112106
synchronized (state.guard) {
113107
if (state.buffer.isEmpty()) {
@@ -138,7 +132,7 @@ private BufferUntilSubscriber(State<T> state) {
138132
private void emit(Object v) {
139133
synchronized (state.guard) {
140134
state.buffer.add(v);
141-
if (state.observerRef != null && !state.emitting) {
135+
if (state.get() != null && !state.emitting) {
142136
// Have an observer and nobody is emitting,
143137
// should drain the `buffer`
144138
forward = true;
@@ -148,7 +142,7 @@ private void emit(Object v) {
148142
if (forward) {
149143
Object o;
150144
while ((o = state.buffer.poll()) != null) {
151-
state.nl.accept(state.observerRef, o);
145+
state.nl.accept(state.get(), o);
152146
}
153147
// Because `emit(Object v)` will be called in sequence,
154148
// no event will be put into `buffer` after we drain it.
@@ -158,7 +152,7 @@ private void emit(Object v) {
158152
@Override
159153
public void onCompleted() {
160154
if (forward) {
161-
state.observerRef.onCompleted();
155+
state.get().onCompleted();
162156
}
163157
else {
164158
emit(state.nl.completed());
@@ -168,7 +162,7 @@ public void onCompleted() {
168162
@Override
169163
public void onError(Throwable e) {
170164
if (forward) {
171-
state.observerRef.onError(e);
165+
state.get().onError(e);
172166
}
173167
else {
174168
emit(state.nl.error(e));
@@ -178,7 +172,7 @@ public void onError(Throwable e) {
178172
@Override
179173
public void onNext(T t) {
180174
if (forward) {
181-
state.observerRef.onNext(t);
175+
state.get().onNext(t);
182176
}
183177
else {
184178
emit(state.nl.next(t));
@@ -188,7 +182,7 @@ public void onNext(T t) {
188182
@Override
189183
public boolean hasObservers() {
190184
synchronized (state.guard) {
191-
return state.observerRef != null;
185+
return state.get() != null;
192186
}
193187
}
194188

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.List;
2020
import java.util.concurrent.atomic.AtomicBoolean;
2121
import java.util.concurrent.atomic.AtomicLong;
22-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2322

2423
import rx.Observable;
2524
import rx.Observable.OnSubscribe;
@@ -90,10 +89,7 @@ final static class MultiSourceProducer<T, R> implements Producer {
9089
private final BitSet completion;
9190
private volatile int completionCount; // does this need to be volatile or is WIP sufficient?
9291

93-
@SuppressWarnings("unused")
94-
private volatile long counter;
95-
@SuppressWarnings("rawtypes")
96-
private static final AtomicLongFieldUpdater<MultiSourceProducer> WIP = AtomicLongFieldUpdater.newUpdater(MultiSourceProducer.class, "counter");
92+
private final AtomicLong counter = new AtomicLong();
9793

9894
@SuppressWarnings("unchecked")
9995
public MultiSourceProducer(final Subscriber<? super R> child, final List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
@@ -139,7 +135,8 @@ public void request(long n) {
139135
* that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn.
140136
*/
141137
void tick() {
142-
if (WIP.getAndIncrement(this) == 0) {
138+
AtomicLong localCounter = this.counter;
139+
if (localCounter.getAndIncrement() == 0) {
143140
int emitted = 0;
144141
do {
145142
// we only emit if requested > 0
@@ -155,7 +152,7 @@ void tick() {
155152
}
156153
}
157154
}
158-
} while (WIP.decrementAndGet(this) > 0);
155+
} while (localCounter.decrementAndGet() > 0);
159156
if (emitted > 0) {
160157
for (MultiSourceRequestableSubscriber<T, R> s : subscribers) {
161158
s.requestUpTo(emitted);

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicLong;
2121

2222
import rx.Observable;
2323
import rx.Observable.Operator;
@@ -84,14 +84,10 @@ static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T
8484

8585
volatile ConcatInnerSubscriber<T> currentSubscriber;
8686

87-
volatile int wip;
88-
@SuppressWarnings("rawtypes")
89-
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");
87+
final AtomicInteger wip = new AtomicInteger();
9088

9189
// accessed by REQUESTED
92-
private volatile long requested;
93-
@SuppressWarnings("rawtypes")
94-
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");
90+
private final AtomicLong requested = new AtomicLong();
9591
private final ProducerArbiter arbiter;
9692

9793
public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
@@ -118,10 +114,10 @@ public void onStart() {
118114
private void requestFromChild(long n) {
119115
if (n <=0) return;
120116
// we track 'requested' so we know whether we should subscribe the next or not
121-
long previous = BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
117+
long previous = BackpressureUtils.getAndAddRequest(requested, n);
122118
arbiter.request(n);
123119
if (previous == 0) {
124-
if (currentSubscriber == null && wip > 0) {
120+
if (currentSubscriber == null && wip.get() > 0) {
125121
// this means we may be moving from one subscriber to another after having stopped processing
126122
// so need to kick off the subscribe via this request notification
127123
subscribeNext();
@@ -130,13 +126,13 @@ private void requestFromChild(long n) {
130126
}
131127

132128
private void decrementRequested() {
133-
REQUESTED.decrementAndGet(this);
129+
requested.decrementAndGet();
134130
}
135131

136132
@Override
137133
public void onNext(Observable<? extends T> t) {
138134
queue.add(nl.next(t));
139-
if (WIP.getAndIncrement(this) == 0) {
135+
if (wip.getAndIncrement() == 0) {
140136
subscribeNext();
141137
}
142138
}
@@ -150,22 +146,22 @@ public void onError(Throwable e) {
150146
@Override
151147
public void onCompleted() {
152148
queue.add(nl.completed());
153-
if (WIP.getAndIncrement(this) == 0) {
149+
if (wip.getAndIncrement() == 0) {
154150
subscribeNext();
155151
}
156152
}
157153

158154

159155
void completeInner() {
160156
currentSubscriber = null;
161-
if (WIP.decrementAndGet(this) > 0) {
157+
if (wip.decrementAndGet() > 0) {
162158
subscribeNext();
163159
}
164160
request(1);
165161
}
166162

167163
void subscribeNext() {
168-
if (requested > 0) {
164+
if (requested.get() > 0) {
169165
Object o = queue.poll();
170166
if (nl.isCompleted(o)) {
171167
child.onCompleted();
@@ -189,10 +185,7 @@ static class ConcatInnerSubscriber<T> extends Subscriber<T> {
189185

190186
private final Subscriber<T> child;
191187
private final ConcatSubscriber<T> parent;
192-
@SuppressWarnings("unused")
193-
private volatile int once = 0;
194-
@SuppressWarnings("rawtypes")
195-
private final static AtomicIntegerFieldUpdater<ConcatInnerSubscriber> ONCE = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once");
188+
private final AtomicInteger once = new AtomicInteger();
196189
private final ProducerArbiter arbiter;
197190

198191
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, ProducerArbiter arbiter) {
@@ -210,15 +203,15 @@ public void onNext(T t) {
210203

211204
@Override
212205
public void onError(Throwable e) {
213-
if (ONCE.compareAndSet(this, 0, 1)) {
206+
if (once.compareAndSet(0, 1)) {
214207
// terminal error through parent so everything gets cleaned up, including this inner
215208
parent.onError(e);
216209
}
217210
}
218211

219212
@Override
220213
public void onCompleted() {
221-
if (ONCE.compareAndSet(this, 0, 1)) {
214+
if (once.compareAndSet(0, 1)) {
222215
// terminal completion to parent so it continues to the next
223216
parent.completeInner();
224217
}

src/main/java/rx/internal/operators/OperatorMaterialize.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18+
import java.util.concurrent.atomic.AtomicLong;
1919

2020
import rx.Notification;
2121
import rx.Observable.Operator;
@@ -76,10 +76,7 @@ private static class ParentSubscriber<T> extends Subscriber<T> {
7676
// guarded by this
7777
private boolean missed = false;
7878

79-
private volatile long requested;
80-
@SuppressWarnings("rawtypes")
81-
private static final AtomicLongFieldUpdater<ParentSubscriber> REQUESTED = AtomicLongFieldUpdater
82-
.newUpdater(ParentSubscriber.class, "requested");
79+
private final AtomicLong requested = new AtomicLong();
8380

8481
ParentSubscriber(Subscriber<? super Notification<T>> child) {
8582
this.child = child;
@@ -91,7 +88,7 @@ public void onStart() {
9188
}
9289

9390
void requestMore(long n) {
94-
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
91+
BackpressureUtils.getAndAddRequest(requested, n);
9592
request(n);
9693
drain();
9794
}
@@ -117,12 +114,13 @@ public void onNext(T t) {
117114

118115
private void decrementRequested() {
119116
// atomically decrement requested
117+
AtomicLong localRequested = this.requested;
120118
while (true) {
121-
long r = requested;
119+
long r = localRequested.get();
122120
if (r == Long.MAX_VALUE) {
123121
// don't decrement if unlimited requested
124122
return;
125-
} else if (REQUESTED.compareAndSet(this, r, r - 1)) {
123+
} else if (localRequested.compareAndSet(r, r - 1)) {
126124
return;
127125
}
128126
}
@@ -137,11 +135,12 @@ private void drain() {
137135
}
138136
}
139137
// drain loop
138+
final AtomicLong localRequested = this.requested;
140139
while (!child.isUnsubscribed()) {
141140
Notification<T> tn;
142141
tn = terminalNotification;
143142
if (tn != null) {
144-
if (requested > 0) {
143+
if (localRequested.get() > 0) {
145144
// allow tn to be GC'd after the onNext call
146145
terminalNotification = null;
147146
// emit the terminal notification

0 commit comments

Comments
 (0)