Skip to content

Commit 9bc1987

Browse files
committed
Merge pull request #3169 from akarnokd/MergeHorizontalUnbounded
Merge can now operate in horizontally unbounded mode.
2 parents 38cd5f1 + e8beca7 commit 9bc1987

File tree

6 files changed

+724
-24
lines changed

6 files changed

+724
-24
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
import java.util.*;
1919
import java.util.concurrent.ConcurrentLinkedQueue;
20-
import java.util.concurrent.atomic.*;
20+
import java.util.concurrent.atomic.AtomicLong;
2121

2222
import rx.*;
23-
import rx.Observable.Operator;
2423
import rx.Observable;
24+
import rx.Observable.Operator;
2525
import rx.exceptions.*;
2626
import rx.internal.util.*;
27+
import rx.internal.util.atomic.*;
28+
import rx.internal.util.unsafe.*;
2729
import rx.subscriptions.CompositeSubscription;
2830

2931
/**
@@ -144,7 +146,7 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
144146

145147
MergeProducer<T> producer;
146148

147-
volatile RxRingBuffer queue;
149+
volatile Queue<Object> queue;
148150

149151
/** Tracks the active subscriptions to sources. */
150152
volatile CompositeSubscription subscriptions;
@@ -182,8 +184,7 @@ public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int max
182184
this.nl = NotificationLite.instance();
183185
this.innerGuard = new Object();
184186
this.innerSubscribers = EMPTY;
185-
long r = Math.min(maxConcurrent, RxRingBuffer.SIZE);
186-
request(r);
187+
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
187188
}
188189

189190
Queue<Throwable> getOrCreateErrorQueue() {
@@ -443,23 +444,27 @@ protected void queueScalar(T value) {
443444
* due to lack of requests or an ongoing emission,
444445
* enqueue the value and try the slow emission path.
445446
*/
446-
RxRingBuffer q = this.queue;
447+
Queue<Object> q = this.queue;
447448
if (q == null) {
448-
q = RxRingBuffer.getSpscInstance();
449-
this.add(q);
449+
int mc = maxConcurrent;
450+
if (mc == Integer.MAX_VALUE) {
451+
q = new SpscUnboundedAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
452+
} else {
453+
if (Pow2.isPowerOfTwo(mc)) {
454+
if (UnsafeAccess.isUnsafeAvailable()) {
455+
q = new SpscArrayQueue<Object>(mc);
456+
} else {
457+
q = new SpscAtomicArrayQueue<Object>(mc);
458+
}
459+
} else {
460+
q = new SpscExactAtomicArrayQueue<Object>(mc);
461+
}
462+
}
450463
this.queue = q;
451464
}
452-
try {
453-
q.onNext(nl.next(value));
454-
} catch (MissingBackpressureException ex) {
455-
this.unsubscribe();
456-
this.onError(ex);
457-
return;
458-
} catch (IllegalStateException ex) {
459-
if (!this.isUnsubscribed()) {
460-
this.unsubscribe();
461-
this.onError(ex);
462-
}
465+
if (!q.offer(value)) {
466+
unsubscribe();
467+
onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value));
463468
return;
464469
}
465470
emit();
@@ -533,7 +538,7 @@ void emitLoop() {
533538
skipFinal = true;
534539
return;
535540
}
536-
RxRingBuffer svq = queue;
541+
Queue<Object> svq = queue;
537542

538543
long r = producer.get();
539544
boolean unbounded = r == Long.MAX_VALUE;
@@ -610,9 +615,6 @@ void emitLoop() {
610615
} else {
611616
reportError();
612617
}
613-
if (svq != null) {
614-
svq.release();
615-
}
616618
skipFinal = true;
617619
return;
618620
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*
14+
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
15+
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java
16+
*/
17+
18+
package rx.internal.util.atomic;
19+
20+
import java.util.*;
21+
import java.util.concurrent.atomic.*;
22+
23+
import rx.internal.util.unsafe.Pow2;
24+
25+
/**
26+
* A single-producer single-consumer bounded queue with exact capacity tracking.
27+
* <p>This means that a queue of 10 will allow exactly 10 offers, however, the underlying storage is still power-of-2.
28+
* <p>The implementation uses field updaters and thus should be platform-safe.
29+
*/
30+
public final class SpscExactAtomicArrayQueue<T> extends AtomicReferenceArray<T> implements Queue<T> {
31+
/** */
32+
private static final long serialVersionUID = 6210984603741293445L;
33+
final int mask;
34+
final int capacitySkip;
35+
volatile long producerIndex;
36+
volatile long consumerIndex;
37+
38+
@SuppressWarnings("rawtypes")
39+
static final AtomicLongFieldUpdater<SpscExactAtomicArrayQueue> PRODUCER_INDEX =
40+
AtomicLongFieldUpdater.newUpdater(SpscExactAtomicArrayQueue.class, "producerIndex");
41+
@SuppressWarnings("rawtypes")
42+
static final AtomicLongFieldUpdater<SpscExactAtomicArrayQueue> CONSUMER_INDEX =
43+
AtomicLongFieldUpdater.newUpdater(SpscExactAtomicArrayQueue.class, "consumerIndex");
44+
45+
public SpscExactAtomicArrayQueue(int capacity) {
46+
super(Pow2.roundToPowerOfTwo(capacity));
47+
int len = length();
48+
this.mask = len - 1;
49+
this.capacitySkip = len - capacity;
50+
}
51+
52+
53+
@Override
54+
public boolean offer(T value) {
55+
if (value == null) {
56+
throw new NullPointerException();
57+
}
58+
59+
long pi = producerIndex;
60+
int m = mask;
61+
62+
int fullCheck = (int)(pi + capacitySkip) & m;
63+
if (get(fullCheck) != null) {
64+
return false;
65+
}
66+
int offset = (int)pi & m;
67+
PRODUCER_INDEX.lazySet(this, pi + 1);
68+
lazySet(offset, value);
69+
return true;
70+
}
71+
@Override
72+
public T poll() {
73+
long ci = consumerIndex;
74+
int offset = (int)ci & mask;
75+
T value = get(offset);
76+
if (value == null) {
77+
return null;
78+
}
79+
CONSUMER_INDEX.lazySet(this, ci + 1);
80+
lazySet(offset, null);
81+
return value;
82+
}
83+
@Override
84+
public T peek() {
85+
return get((int)consumerIndex & mask);
86+
}
87+
@Override
88+
public void clear() {
89+
while (poll() != null || !isEmpty());
90+
}
91+
@Override
92+
public boolean isEmpty() {
93+
return producerIndex == consumerIndex;
94+
}
95+
96+
@Override
97+
public int size() {
98+
long ci = consumerIndex;
99+
for (;;) {
100+
long pi = producerIndex;
101+
long ci2 = consumerIndex;
102+
if (ci == ci2) {
103+
return (int)(pi - ci2);
104+
}
105+
ci = ci2;
106+
}
107+
}
108+
109+
@Override
110+
public boolean contains(Object o) {
111+
throw new UnsupportedOperationException();
112+
}
113+
114+
@Override
115+
public Iterator<T> iterator() {
116+
throw new UnsupportedOperationException();
117+
}
118+
119+
@Override
120+
public Object[] toArray() {
121+
throw new UnsupportedOperationException();
122+
}
123+
124+
@Override
125+
public <E> E[] toArray(E[] a) {
126+
throw new UnsupportedOperationException();
127+
}
128+
129+
@Override
130+
public boolean remove(Object o) {
131+
throw new UnsupportedOperationException();
132+
}
133+
134+
@Override
135+
public boolean containsAll(Collection<?> c) {
136+
throw new UnsupportedOperationException();
137+
}
138+
139+
@Override
140+
public boolean addAll(Collection<? extends T> c) {
141+
throw new UnsupportedOperationException();
142+
}
143+
144+
@Override
145+
public boolean removeAll(Collection<?> c) {
146+
throw new UnsupportedOperationException();
147+
}
148+
149+
@Override
150+
public boolean retainAll(Collection<?> c) {
151+
throw new UnsupportedOperationException();
152+
}
153+
154+
@Override
155+
public boolean add(T e) {
156+
throw new UnsupportedOperationException();
157+
}
158+
159+
@Override
160+
public T remove() {
161+
throw new UnsupportedOperationException();
162+
}
163+
164+
@Override
165+
public T element() {
166+
throw new UnsupportedOperationException();
167+
}
168+
169+
}

0 commit comments

Comments
 (0)