Skip to content

Commit bac1552

Browse files
committed
Merge pull request #3589 from akarnokd/ConcatPerf1x
1.x: concat reduce overhead when streaming a source
2 parents cd9a6ec + 234a4c4 commit bac1552

File tree

3 files changed

+133
-20
lines changed

3 files changed

+133
-20
lines changed

src/main/java/rx/internal/operators/BackpressureUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,27 @@ public static long addCap(long a, long b) {
103103
return u;
104104
}
105105

106+
/**
107+
* Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
108+
* @param requested the requested amount holder
109+
* @param n the value to subtract from the requested amount, has to be positive (not verified)
110+
* @return the new requested amount
111+
* @throws IllegalStateException if n is greater than the current requested amount, which
112+
* indicates a bug in the request accounting logic
113+
*/
114+
public static long produced(AtomicLong requested, long n) {
115+
for (;;) {
116+
long current = requested.get();
117+
if (current == Long.MAX_VALUE) {
118+
return Long.MAX_VALUE;
119+
}
120+
long next = current - n;
121+
if (next < 0L) {
122+
throw new IllegalStateException("More produced than requested: " + next);
123+
}
124+
if (requested.compareAndSet(current, next)) {
125+
return next;
126+
}
127+
}
128+
}
106129
}

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicInteger;
20-
import java.util.concurrent.atomic.AtomicLong;
19+
import java.util.concurrent.atomic.*;
2120

22-
import rx.Observable;
21+
import rx.*;
2322
import rx.Observable.Operator;
24-
import rx.Producer;
25-
import rx.Subscriber;
2623
import rx.functions.Action0;
2724
import rx.internal.producers.ProducerArbiter;
2825
import rx.observers.SerializedSubscriber;
29-
import rx.subscriptions.SerialSubscription;
30-
import rx.subscriptions.Subscriptions;
26+
import rx.subscriptions.*;
3127

3228
/**
3329
* Returns an Observable that emits the items emitted by two or more Observables, one after the other.
@@ -112,9 +108,19 @@ public void onStart() {
112108
}
113109

114110
private void requestFromChild(long n) {
115-
if (n <=0) return;
111+
if (n <= 0) return;
116112
// we track 'requested' so we know whether we should subscribe the next or not
117-
long previous = BackpressureUtils.getAndAddRequest(requested, n);
113+
114+
final AtomicLong requestedField = requested;
115+
116+
long previous;
117+
118+
if (requestedField.get() != Long.MAX_VALUE) {
119+
previous = BackpressureUtils.getAndAddRequest(requestedField, n);
120+
} else {
121+
previous = Long.MAX_VALUE;
122+
}
123+
118124
arbiter.request(n);
119125
if (previous == 0) {
120126
if (currentSubscriber == null && wip.get() > 0) {
@@ -125,10 +131,6 @@ private void requestFromChild(long n) {
125131
}
126132
}
127133

128-
private void decrementRequested() {
129-
requested.decrementAndGet();
130-
}
131-
132134
@Override
133135
public void onNext(Observable<? extends T> t) {
134136
queue.add(nl.next(t));
@@ -167,8 +169,10 @@ void subscribeNext() {
167169
child.onCompleted();
168170
} else if (o != null) {
169171
Observable<? extends T> obs = nl.getValue(o);
172+
170173
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, arbiter);
171174
current.set(currentSubscriber);
175+
172176
obs.unsafeSubscribe(currentSubscriber);
173177
}
174178
} else {
@@ -179,14 +183,23 @@ void subscribeNext() {
179183
}
180184
}
181185
}
186+
187+
void produced(long c) {
188+
if (c != 0L) {
189+
arbiter.produced(c);
190+
BackpressureUtils.produced(requested, c);
191+
}
192+
}
182193
}
183194

184195
static class ConcatInnerSubscriber<T> extends Subscriber<T> {
185196

186197
private final Subscriber<T> child;
187198
private final ConcatSubscriber<T> parent;
188-
private final AtomicInteger once = new AtomicInteger();
199+
private final AtomicBoolean once = new AtomicBoolean();
189200
private final ProducerArbiter arbiter;
201+
202+
long produced;
190203

191204
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, ProducerArbiter arbiter) {
192205
this.parent = parent;
@@ -196,31 +209,33 @@ public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, Pr
196209

197210
@Override
198211
public void onNext(T t) {
212+
produced++;
213+
199214
child.onNext(t);
200-
parent.decrementRequested();
201-
arbiter.produced(1);
202215
}
203216

204217
@Override
205218
public void onError(Throwable e) {
206-
if (once.compareAndSet(0, 1)) {
219+
if (once.compareAndSet(false, true)) {
207220
// terminal error through parent so everything gets cleaned up, including this inner
208221
parent.onError(e);
209222
}
210223
}
211224

212225
@Override
213226
public void onCompleted() {
214-
if (once.compareAndSet(0, 1)) {
227+
if (once.compareAndSet(false, true)) {
228+
ConcatSubscriber<T> p = parent;
229+
// signal the production count at once instead of one by one
230+
p.produced(produced);
215231
// terminal completion to parent so it continues to the next
216-
parent.completeInner();
232+
p.completeInner();
217233
}
218234
}
219235

220236
@Override
221237
public void setProducer(Producer producer) {
222238
arbiter.setProducer(producer);
223239
}
224-
225240
}
226241
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.openjdk.jmh.annotations.Benchmark;
22+
import org.openjdk.jmh.annotations.BenchmarkMode;
23+
import org.openjdk.jmh.annotations.Mode;
24+
import org.openjdk.jmh.annotations.OutputTimeUnit;
25+
import org.openjdk.jmh.annotations.Param;
26+
import org.openjdk.jmh.annotations.Scope;
27+
import org.openjdk.jmh.annotations.Setup;
28+
import org.openjdk.jmh.annotations.State;
29+
import org.openjdk.jmh.infra.Blackhole;
30+
31+
import rx.Observable;
32+
import rx.jmh.LatchedObserver;
33+
34+
/**
35+
* Benchmark typical atomic operations on volatile fields and AtomicXYZ classes.
36+
* <p>
37+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*ConcatPerf.*"
38+
* <p>
39+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*ConcatPerf.*"
40+
*/
41+
@BenchmarkMode(Mode.Throughput)
42+
@OutputTimeUnit(TimeUnit.SECONDS)
43+
@State(Scope.Thread)
44+
public class ConcatPerf {
45+
46+
Observable<Integer> source;
47+
48+
Observable<Integer> baseline;
49+
50+
@Param({"1", "1000", "1000000"})
51+
int count;
52+
53+
@Setup
54+
public void setup() {
55+
Integer[] array = new Integer[count];
56+
57+
for (int i = 0; i < count; i++) {
58+
array[i] = 777;
59+
}
60+
61+
baseline = Observable.from(array);
62+
63+
source = Observable.concat(baseline, Observable.<Integer>empty());
64+
}
65+
66+
@Benchmark
67+
public void normal(Blackhole bh) {
68+
source.subscribe(new LatchedObserver<Integer>(bh));
69+
}
70+
71+
@Benchmark
72+
public void baseline(Blackhole bh) {
73+
baseline.subscribe(new LatchedObserver<Integer>(bh));
74+
}
75+
}

0 commit comments

Comments
 (0)