Skip to content

Commit 5dc0292

Browse files
Perf Tests with JMH
1 parent ab7f408 commit 5dc0292

File tree

3 files changed

+124
-18
lines changed

3 files changed

+124
-18
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package rx.jmh;
2+
3+
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
4+
import org.openjdk.jmh.annotations.Param;
5+
import org.openjdk.jmh.annotations.Scope;
6+
import org.openjdk.jmh.annotations.State;
7+
import org.openjdk.jmh.logic.BlackHole;
8+
9+
import rx.functions.Func1;
10+
11+
public class Baseline {
12+
13+
@GenerateMicroBenchmark
14+
public void forLoopInvokingFunction(BlackHole bh, Input input) {
15+
for (int value = 0; value < input.size; value++) {
16+
bh.consume(IDENTITY_FUNCTION.call(value));
17+
}
18+
}
19+
20+
private static final Func1<Integer, Integer> IDENTITY_FUNCTION = new Func1<Integer, Integer>() {
21+
@Override
22+
public Integer call(Integer value) {
23+
return value;
24+
}
25+
};
26+
27+
@State(Scope.Thread)
28+
public static class Input {
29+
30+
@Param({ "1024", "1048576" })
31+
public int size;
32+
33+
}
34+
35+
}

rxjava-core/src/perf/java/rx/operators/OperatorMapPerf.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package rx.operators;
22

3-
import java.util.ArrayList;
4-
import java.util.Collection;
53
import java.util.concurrent.CountDownLatch;
64

75
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
@@ -21,14 +19,7 @@
2119
public class OperatorMapPerf {
2220

2321
@GenerateMicroBenchmark
24-
public void measureBaseline(BlackHole bh, Input input) {
25-
for (Integer value : input.values) {
26-
bh.consume(IDENTITY_FUNCTION.call(value));
27-
}
28-
}
29-
30-
@GenerateMicroBenchmark
31-
public void measureMap(Input input) throws InterruptedException {
22+
public void mapIdentityFunction(Input input) throws InterruptedException {
3223
input.observable.lift(MAP_OPERATOR).subscribe(input.observer);
3324

3425
input.awaitCompletion();
@@ -46,26 +37,20 @@ public Integer call(Integer value) {
4637
@State(Scope.Thread)
4738
public static class Input {
4839

49-
@Param({"1", "1024", "1048576"})
40+
@Param({ "1", "1024", "1048576" })
5041
public int size;
5142

52-
public Collection<Integer> values;
5343
public Observable<Integer> observable;
5444
public Observer<Integer> observer;
5545

5646
private CountDownLatch latch;
5747

5848
@Setup
5949
public void setup() {
60-
values = new ArrayList<Integer>();
61-
for(int i = 0; i < size; i ++) {
62-
values.add(i);
63-
}
64-
6550
observable = Observable.create(new OnSubscribe<Integer>() {
6651
@Override
6752
public void call(Subscriber<? super Integer> o) {
68-
for (Integer value : values) {
53+
for (int value = 0; value < size; value++) {
6954
if (o.isUnsubscribed())
7055
return;
7156
o.onNext(value);
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package rx.operators;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
5+
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
6+
import org.openjdk.jmh.annotations.Param;
7+
import org.openjdk.jmh.annotations.Scope;
8+
import org.openjdk.jmh.annotations.Setup;
9+
import org.openjdk.jmh.annotations.State;
10+
import org.openjdk.jmh.logic.BlackHole;
11+
12+
import rx.Observable;
13+
import rx.Observable.OnSubscribe;
14+
import rx.Observer;
15+
import rx.Subscriber;
16+
import rx.observers.TestSubscriber;
17+
18+
public class OperatorSerializePerf {
19+
20+
@GenerateMicroBenchmark
21+
public void noSerializationSingleThreaded(Input input) {
22+
input.observable.subscribe(input.subscriber);
23+
}
24+
25+
@GenerateMicroBenchmark
26+
public void serializedSingleStream(Input input) {
27+
input.observable.serialize().subscribe(input.subscriber);
28+
}
29+
30+
@GenerateMicroBenchmark
31+
public void synchronizedSingleStream(Input input) {
32+
input.observable.synchronize().subscribe(input.subscriber);
33+
}
34+
35+
@State(Scope.Thread)
36+
public static class Input {
37+
38+
@Param({ "1024", "1048576" })
39+
public int size;
40+
41+
public Observable<Integer> observable;
42+
public TestSubscriber<Integer> subscriber;
43+
44+
private CountDownLatch latch;
45+
46+
@Setup
47+
public void setup() {
48+
observable = Observable.create(new OnSubscribe<Integer>() {
49+
@Override
50+
public void call(Subscriber<? super Integer> o) {
51+
for (int value = 0; value < size; value++) {
52+
if (o.isUnsubscribed())
53+
return;
54+
o.onNext(value);
55+
}
56+
o.onCompleted();
57+
}
58+
});
59+
60+
final BlackHole bh = new BlackHole();
61+
latch = new CountDownLatch(1);
62+
63+
subscriber = new TestSubscriber<Integer>(new Observer<Integer>() {
64+
@Override
65+
public void onCompleted() {
66+
latch.countDown();
67+
}
68+
69+
@Override
70+
public void onError(Throwable e) {
71+
throw new RuntimeException(e);
72+
}
73+
74+
@Override
75+
public void onNext(Integer value) {
76+
bh.consume(value);
77+
}
78+
});
79+
80+
}
81+
82+
public void awaitCompletion() throws InterruptedException {
83+
latch.await();
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)