Skip to content

Commit a1e0352

Browse files
Merge pull request #1164 from benjchristensen/scheduler-jmh-perf-tests
JMH Perf Tests for Schedulers.computation
2 parents 38a87f5 + f6af2dc commit a1e0352

File tree

3 files changed

+120
-65
lines changed

3 files changed

+120
-65
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.jmh;
17+
18+
import java.util.concurrent.CountDownLatch;
19+
20+
import org.openjdk.jmh.annotations.Param;
21+
import org.openjdk.jmh.annotations.Scope;
22+
import org.openjdk.jmh.annotations.Setup;
23+
import org.openjdk.jmh.annotations.State;
24+
import org.openjdk.jmh.logic.BlackHole;
25+
26+
import rx.Observable;
27+
import rx.Observable.OnSubscribe;
28+
import rx.Observer;
29+
import rx.Subscriber;
30+
31+
/**
32+
* Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole.
33+
*/
34+
@State(Scope.Thread)
35+
public class InputWithIncrementingInteger {
36+
@Param({ "1", "1024", "1048576" })
37+
public int size;
38+
39+
public Observable<Integer> observable;
40+
public Observer<Integer> observer;
41+
42+
private CountDownLatch latch;
43+
44+
@Setup
45+
public void setup() {
46+
observable = Observable.create(new OnSubscribe<Integer>() {
47+
@Override
48+
public void call(Subscriber<? super Integer> o) {
49+
for (int value = 0; value < size; value++) {
50+
if (o.isUnsubscribed())
51+
return;
52+
o.onNext(value);
53+
}
54+
o.onCompleted();
55+
}
56+
});
57+
58+
final BlackHole bh = new BlackHole();
59+
latch = new CountDownLatch(1);
60+
61+
observer = new Observer<Integer>() {
62+
@Override
63+
public void onCompleted() {
64+
latch.countDown();
65+
}
66+
67+
@Override
68+
public void onError(Throwable e) {
69+
throw new RuntimeException(e);
70+
}
71+
72+
@Override
73+
public void onNext(Integer value) {
74+
bh.consume(value);
75+
}
76+
};
77+
78+
}
79+
80+
public void awaitCompletion() throws InterruptedException {
81+
latch.await();
82+
}
83+
}

rxjava-core/src/perf/java/rx/operators/OperatorMapPerf.java

Lines changed: 2 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,17 @@
1515
*/
1616
package rx.operators;
1717

18-
import java.util.concurrent.CountDownLatch;
19-
2018
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
21-
import org.openjdk.jmh.annotations.Param;
22-
import org.openjdk.jmh.annotations.Scope;
23-
import org.openjdk.jmh.annotations.Setup;
24-
import org.openjdk.jmh.annotations.State;
25-
import org.openjdk.jmh.logic.BlackHole;
2619

27-
import rx.Observable;
28-
import rx.Observable.OnSubscribe;
2920
import rx.Observable.Operator;
30-
import rx.Observer;
31-
import rx.Subscriber;
3221
import rx.functions.Func1;
22+
import rx.jmh.InputWithIncrementingInteger;
3323

3424
public class OperatorMapPerf {
3525

3626
@GenerateMicroBenchmark
37-
public void mapIdentityFunction(Input input) throws InterruptedException {
27+
public void mapIdentityFunction(InputWithIncrementingInteger input) throws InterruptedException {
3828
input.observable.lift(MAP_OPERATOR).subscribe(input.observer);
39-
4029
input.awaitCompletion();
4130
}
4231

@@ -49,56 +38,4 @@ public Integer call(Integer value) {
4938

5039
private static final Operator<Integer, Integer> MAP_OPERATOR = new OperatorMap<Integer, Integer>(IDENTITY_FUNCTION);
5140

52-
@State(Scope.Thread)
53-
public static class Input {
54-
55-
@Param({ "1", "1024", "1048576" })
56-
public int size;
57-
58-
public Observable<Integer> observable;
59-
public Observer<Integer> observer;
60-
61-
private CountDownLatch latch;
62-
63-
@Setup
64-
public void setup() {
65-
observable = Observable.create(new OnSubscribe<Integer>() {
66-
@Override
67-
public void call(Subscriber<? super Integer> o) {
68-
for (int value = 0; value < size; value++) {
69-
if (o.isUnsubscribed())
70-
return;
71-
o.onNext(value);
72-
}
73-
o.onCompleted();
74-
}
75-
});
76-
77-
final BlackHole bh = new BlackHole();
78-
latch = new CountDownLatch(1);
79-
80-
observer = new Observer<Integer>() {
81-
@Override
82-
public void onCompleted() {
83-
latch.countDown();
84-
}
85-
86-
@Override
87-
public void onError(Throwable e) {
88-
throw new RuntimeException(e);
89-
}
90-
91-
@Override
92-
public void onNext(Integer value) {
93-
bh.consume(value);
94-
}
95-
};
96-
97-
}
98-
99-
public void awaitCompletion() throws InterruptedException {
100-
latch.await();
101-
}
102-
}
103-
10441
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
19+
20+
import rx.jmh.InputWithIncrementingInteger;
21+
22+
public class ComputationSchedulerPerf {
23+
24+
@GenerateMicroBenchmark
25+
public void subscribeOn(InputWithIncrementingInteger input) throws InterruptedException {
26+
input.observable.subscribeOn(Schedulers.computation()).subscribe(input.observer);
27+
input.awaitCompletion();
28+
}
29+
30+
@GenerateMicroBenchmark
31+
public void observeOn(InputWithIncrementingInteger input) throws InterruptedException {
32+
input.observable.observeOn(Schedulers.computation()).subscribe(input.observer);
33+
input.awaitCompletion();
34+
}
35+
}

0 commit comments

Comments
 (0)