Skip to content

Commit e07d341

Browse files
Merge pull request #837 from abersnaze/perf_jmh
Perf with JMH
2 parents c3ec19a + d012a52 commit e07d341

File tree

3 files changed

+145
-2
lines changed

3 files changed

+145
-2
lines changed

build.gradle

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,19 @@ subprojects {
4444
//include /src/examples folder
4545
examples
4646
//include /src/perf folder
47-
// perf //-> Not working so commented out
47+
perf {
48+
java {
49+
srcDir 'src/perf/java'
50+
compileClasspath += main.output
51+
runtimeClasspath += main.output
52+
}
53+
}
4854
}
49-
55+
56+
dependencies {
57+
perfCompile 'org.openjdk.jmh:jmh-core:0.2'
58+
}
59+
5060
tasks.build {
5161
//include 'examples' in build task
5262
dependsOn(examplesClasses)
@@ -58,6 +68,7 @@ subprojects {
5868
classpath {
5969
// include 'provided' dependencies on the classpath
6070
plusConfigurations += configurations.provided
71+
plusConfigurations += configurations.perfCompile
6172

6273
downloadSources = true
6374
downloadJavadoc = true
@@ -68,6 +79,8 @@ subprojects {
6879
module {
6980
// include 'provided' dependencies on the classpath
7081
scopes.PROVIDED.plus += configurations.provided
82+
// TODO not sure what to add it to
83+
//scopes.PROVIDED.plus += configurations.perfCompile
7184
}
7285
}
7386
}

rxjava-core/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
apply plugin: 'maven'
22
apply plugin: 'osgi'
3+
apply plugin:'application'
34

45
sourceCompatibility = JavaVersion.VERSION_1_6
56
targetCompatibility = JavaVersion.VERSION_1_6
@@ -31,3 +32,9 @@ jar {
3132
}
3233
}
3334

35+
task time(type:JavaExec) {
36+
classpath = sourceSets.perf.runtimeClasspath
37+
group 'Application'
38+
description 'Execute the calipser benchmark timing of Rx'
39+
main 'rx.operators.ObservableBenchmark'
40+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package rx.operators;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
7+
import org.openjdk.jmh.runner.Runner;
8+
import org.openjdk.jmh.runner.RunnerException;
9+
import org.openjdk.jmh.runner.options.Options;
10+
import org.openjdk.jmh.runner.options.OptionsBuilder;
11+
12+
import rx.Observable;
13+
import rx.Observable.OnSubscribe;
14+
import rx.Observer;
15+
import rx.util.functions.Func1;
16+
17+
public class ObservableBenchmark {
18+
19+
@GenerateMicroBenchmark
20+
public void timeBaseline() {
21+
observableOfInts.subscribe(newObserver());
22+
awaitAllObservers();
23+
}
24+
25+
@GenerateMicroBenchmark
26+
public int timeMapIterate() {
27+
int x = 0;
28+
for (int j = 0; j < intValues.length; j++) {
29+
// use hash code to make sure the JIT doesn't optimize too much and remove all of
30+
// our code.
31+
x |= ident.call(intValues[j]).hashCode();
32+
}
33+
return x;
34+
}
35+
36+
@GenerateMicroBenchmark
37+
public void timeMap() {
38+
timeOperator(new OperatorMap<Integer, Object>(ident));
39+
}
40+
41+
/**************************************************************************
42+
* Below is internal stuff to avoid object allocation and time overhead of anything that isn't
43+
* being tested.
44+
*
45+
* @throws RunnerException
46+
**************************************************************************/
47+
48+
public static void main(String[] args) throws RunnerException {
49+
Options opt = new OptionsBuilder()
50+
.include(ObservableBenchmark.class.getName()+".*")
51+
.forks(1)
52+
.build();
53+
54+
new Runner(opt).run();
55+
}
56+
57+
private void timeOperator(Operator<Object, Integer> op) {
58+
observableOfInts.lift(op).subscribe(newObserver());
59+
awaitAllObservers();
60+
}
61+
62+
private final static AtomicInteger outstanding = new AtomicInteger(0);
63+
private final static CountDownLatch latch = new CountDownLatch(1);
64+
65+
private static <T> Observer<T> newObserver() {
66+
outstanding.incrementAndGet();
67+
return new Observer<T>() {
68+
@Override
69+
public void onCompleted() {
70+
int left = outstanding.decrementAndGet();
71+
if (left == 0) {
72+
latch.countDown();
73+
}
74+
}
75+
76+
@Override
77+
public void onError(Throwable e) {
78+
int left = outstanding.decrementAndGet();
79+
if (left == 0) {
80+
latch.countDown();
81+
}
82+
}
83+
84+
@Override
85+
public void onNext(T t) {
86+
// do nothing
87+
}
88+
};
89+
}
90+
91+
private static void awaitAllObservers() {
92+
try {
93+
latch.await();
94+
} catch (InterruptedException e) {
95+
return;
96+
}
97+
}
98+
99+
private static final Integer[] intValues = new Integer[1000];
100+
static {
101+
for (int i = 0; i < intValues.length; i++) {
102+
intValues[i] = i;
103+
}
104+
}
105+
106+
private static final Observable<Integer> observableOfInts = Observable.create(new OnSubscribe<Integer>() {
107+
@Override
108+
public void call(Observer<? super Integer> o) {
109+
for (int i = 0; i < intValues.length; i++) {
110+
if (o.isUnsubscribed())
111+
return;
112+
o.onNext(intValues[i]);
113+
}
114+
o.onCompleted();
115+
}
116+
});
117+
private static final Func1<Integer, Object> ident = new Func1<Integer, Object>() {
118+
@Override
119+
public Object call(Integer t) {
120+
return t;
121+
}
122+
};
123+
}

0 commit comments

Comments
 (0)