Skip to content

Commit 35e4507

Browse files
committed
Merge branch 'master' into debug
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 3d9842f + 0e5a098 commit 35e4507

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1252
-1057
lines changed

build.gradle

+15-2
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,19 @@ subprojects {
4444
//include /src/examples folder
4545
examples
4646
//include /src/perf folder
47-
// perf //-> Not working so commented out
47+
perf {
48+
java {
49+
srcDir 'src/perf/java'
50+
compileClasspath += main.output
51+
runtimeClasspath += main.output
52+
}
53+
}
4854
}
49-
55+
56+
dependencies {
57+
perfCompile 'org.openjdk.jmh:jmh-core:0.2'
58+
}
59+
5060
tasks.build {
5161
//include 'examples' in build task
5262
dependsOn(examplesClasses)
@@ -58,6 +68,7 @@ subprojects {
5868
classpath {
5969
// include 'provided' dependencies on the classpath
6070
plusConfigurations += configurations.provided
71+
plusConfigurations += configurations.perfCompile
6172

6273
downloadSources = true
6374
downloadJavadoc = true
@@ -68,6 +79,8 @@ subprojects {
6879
module {
6980
// include 'provided' dependencies on the classpath
7081
scopes.PROVIDED.plus += configurations.provided
82+
// TODO not sure what to add it to
83+
//scopes.PROVIDED.plus += configurations.perfCompile
7184
}
7285
}
7386
}

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

+10-2
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ class RxScalaDemo extends JUnitSuite {
404404
import Notification._
405405
o.materialize.subscribe(n => n match {
406406
case OnNext(v) => println("Got value " + v)
407-
case OnCompleted() => println("Completed")
407+
case OnCompleted => println("Completed")
408408
case OnError(err) => println("Error: " + err.getMessage)
409409
})
410410
}
@@ -420,11 +420,19 @@ class RxScalaDemo extends JUnitSuite {
420420
import Notification._
421421
List(1, 2, 3).toObservable.materialize.subscribe(n => n match {
422422
case OnNext(v) => println("Got value " + v)
423-
case OnCompleted() => println("Completed")
423+
case OnCompleted => println("Completed")
424424
case OnError(err) => println("Error: " + err.getMessage)
425425
})
426426
}
427427

428+
@Test def notificationSubtyping() {
429+
import Notification._
430+
val oc1: Notification[Nothing] = OnCompleted
431+
val oc2: Notification[Int] = OnCompleted
432+
val oc3: rx.Notification[_ <: Int] = oc2.asJavaNotification
433+
val oc4: rx.Notification[_ <: Any] = oc2.asJavaNotification
434+
}
435+
428436
@Test def elementAtReplacement() {
429437
assertEquals("b", List("a", "b", "c").toObservable.drop(1).first.toBlockingObservable.single)
430438
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala

+6-23
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ sealed trait Notification[+T] {
4141
this match {
4242
case Notification.OnNext(value) => onNext(value)
4343
case Notification.OnError(error) => onError(error)
44-
case Notification.OnCompleted() => onCompleted()
44+
case Notification.OnCompleted => onCompleted()
4545
}
4646
}
4747

@@ -58,7 +58,7 @@ sealed trait Notification[+T] {
5858
this match {
5959
case Notification.OnNext(value) => observer.onNext(value)
6060
case Notification.OnError(error) => observer.onError(error)
61-
case Notification.OnCompleted() => observer.onCompleted()
61+
case Notification.OnCompleted => observer.onCompleted()
6262
}
6363
}
6464

@@ -82,7 +82,7 @@ object Notification {
8282

8383
private [scala] def apply[T](n: rx.Notification[_ <: T]): Notification[T] = n.getKind match {
8484
case rx.Notification.Kind.OnNext => new OnNext(n)
85-
case rx.Notification.Kind.OnCompleted => new OnCompleted(n)
85+
case rx.Notification.Kind.OnCompleted => OnCompleted
8686
case rx.Notification.Kind.OnError => new OnError(n)
8787
}
8888

@@ -150,26 +150,9 @@ object Notification {
150150
override def toString = s"OnError($error)"
151151
}
152152

153-
object OnCompleted {
154-
155-
/**
156-
* Constructor for onCompleted notifications.
157-
*/
158-
def apply[T](): Notification[T] = {
159-
Notification(rx.Notification.createOnCompleted[T]())
160-
}
161-
162-
/**
163-
* Extractor for onCompleted notifications.
164-
*/
165-
def unapply[U](notification: Notification[U]): Option[Unit] = notification match {
166-
case onCompleted: OnCompleted[U] => Some()
167-
case _ => None
168-
}
169-
}
170-
171-
class OnCompleted[T] private[scala](val asJavaNotification: rx.Notification[_ <: T]) extends Notification[T] {
172-
override def toString = "OnCompleted()"
153+
object OnCompleted extends Notification[Nothing] {
154+
override def toString = "OnCompleted"
155+
val asJavaNotification = rx.Notification.createOnCompleted[Nothing]()
173156
}
174157

175158
}

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ class NotificationTests extends JUnitSuite {
3737
val onError = OnError(oops)
3838
assertEquals(oops, onError match { case OnError(error) => error })
3939

40-
val onCompleted = OnCompleted()
41-
assertEquals((), onCompleted match { case OnCompleted() => () })
40+
val onCompleted = OnCompleted
41+
assertEquals((), onCompleted match { case OnCompleted => () })
4242
}
4343

4444
@Test
@@ -51,7 +51,7 @@ class NotificationTests extends JUnitSuite {
5151
val onError = OnError(oops)
5252
assertEquals(4711, onError(x=>42, e=>4711,()=>13))
5353

54-
val onCompleted = OnCompleted()
54+
val onCompleted = OnCompleted
5555
assertEquals(13, onCompleted(x=>42, e=>4711,()=>13))
5656

5757
}

rxjava-core/build.gradle

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
apply plugin: 'maven'
22
apply plugin: 'osgi'
3+
apply plugin:'application'
34

45
sourceCompatibility = JavaVersion.VERSION_1_6
56
targetCompatibility = JavaVersion.VERSION_1_6
@@ -31,3 +32,9 @@ jar {
3132
}
3233
}
3334

35+
task time(type:JavaExec) {
36+
classpath = sourceSets.perf.runtimeClasspath
37+
group 'Application'
38+
description 'Execute the calipser benchmark timing of Rx'
39+
main 'rx.operators.ObservableBenchmark'
40+
}

rxjava-core/src/main/java/rx/Observable.java

+42-20
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@
2727
import java.util.concurrent.Future;
2828
import java.util.concurrent.TimeUnit;
2929

30-
import rx.Observable.OnSubscribe;
3130
import rx.joins.Pattern2;
3231
import rx.joins.Plan0;
3332
import rx.observables.BlockingObservable;
3433
import rx.observables.ConnectableObservable;
3534
import rx.observables.GroupedObservable;
3635
import rx.observers.SafeSubscriber;
36+
import rx.operators.OnSubscribeFromIterable;
37+
import rx.operators.OnSubscribeRange;
3738
import rx.operators.OperationAll;
3839
import rx.operators.OperationAmb;
3940
import rx.operators.OperationAny;
@@ -50,8 +51,6 @@
5051
import rx.operators.OperationDematerialize;
5152
import rx.operators.OperationDistinct;
5253
import rx.operators.OperationDistinctUntilChanged;
53-
import rx.operators.Operator;
54-
import rx.operators.OperatorDoOnEach;
5554
import rx.operators.OperationElementAt;
5655
import rx.operators.OperationFilter;
5756
import rx.operators.OperationFinally;
@@ -65,13 +64,11 @@
6564
import rx.operators.OperationMergeDelayError;
6665
import rx.operators.OperationMinMax;
6766
import rx.operators.OperationMulticast;
68-
import rx.operators.OperationObserveOn;
6967
import rx.operators.OperationOnErrorResumeNextViaFunction;
7068
import rx.operators.OperationOnErrorResumeNextViaObservable;
7169
import rx.operators.OperationOnErrorReturn;
7270
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7371
import rx.operators.OperationParallelMerge;
74-
import rx.operators.OperatorRepeat;
7572
import rx.operators.OperationReplay;
7673
import rx.operators.OperationRetry;
7774
import rx.operators.OperationSample;
@@ -98,19 +95,21 @@
9895
import rx.operators.OperationToObservableFuture;
9996
import rx.operators.OperationUsing;
10097
import rx.operators.OperationWindow;
101-
import rx.operators.OperatorSubscribeOn;
102-
import rx.operators.OperatorZip;
10398
import rx.operators.Operator;
10499
import rx.operators.OperatorCast;
105-
import rx.operators.OperatorFromIterable;
100+
import rx.operators.OperatorDoOnEach;
106101
import rx.operators.OperatorGroupBy;
107102
import rx.operators.OperatorMap;
108103
import rx.operators.OperatorMerge;
104+
import rx.operators.OperatorObserveOn;
109105
import rx.operators.OperatorParallel;
106+
import rx.operators.OperatorRepeat;
107+
import rx.operators.OperatorSubscribeOn;
110108
import rx.operators.OperatorTake;
111109
import rx.operators.OperatorTimestamp;
112110
import rx.operators.OperatorToObservableList;
113111
import rx.operators.OperatorToObservableSortedList;
112+
import rx.operators.OperatorZip;
114113
import rx.operators.OperatorZipIterable;
115114
import rx.plugins.RxJavaObservableExecutionHook;
116115
import rx.plugins.RxJavaPlugins;
@@ -121,8 +120,8 @@
121120
import rx.subjects.ReplaySubject;
122121
import rx.subjects.Subject;
123122
import rx.subscriptions.Subscriptions;
123+
import rx.util.Exceptions;
124124
import rx.util.OnErrorNotImplementedException;
125-
import rx.util.Range;
126125
import rx.util.TimeInterval;
127126
import rx.util.Timestamped;
128127
import rx.util.functions.Action0;
@@ -1218,7 +1217,7 @@ public final static <T> Observable<T> from(Future<? extends T> future, Scheduler
12181217
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
12191218
*/
12201219
public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
1221-
return create(new OperatorFromIterable<T>(iterable));
1220+
return create(new OnSubscribeFromIterable<T>(iterable));
12221221
}
12231222

12241223
/**
@@ -1240,7 +1239,7 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
12401239
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
12411240
*/
12421241
public final static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
1243-
return create(new OperatorFromIterable<T>(iterable)).subscribeOn(scheduler);
1242+
return create(new OnSubscribeFromIterable<T>(iterable)).subscribeOn(scheduler);
12441243
}
12451244

12461245
/**
@@ -2440,7 +2439,13 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
24402439
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229460.aspx">MSDN: Observable.Range</a>
24412440
*/
24422441
public final static Observable<Integer> range(int start, int count) {
2443-
return from(Range.createWithCount(start, count));
2442+
if (count < 1) {
2443+
throw new IllegalArgumentException("Count must be positive");
2444+
}
2445+
if ((start + count) > Integer.MAX_VALUE) {
2446+
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
2447+
}
2448+
return Observable.create(new OnSubscribeRange(start, start + (count - 1)));
24442449
}
24452450

24462451
/**
@@ -2460,7 +2465,7 @@ public final static Observable<Integer> range(int start, int count) {
24602465
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">MSDN: Observable.Range</a>
24612466
*/
24622467
public final static Observable<Integer> range(int start, int count, Scheduler scheduler) {
2463-
return from(Range.createWithCount(start, count), scheduler);
2468+
return range(start, count).subscribeOn(scheduler);
24642469
}
24652470

24662471
/**
@@ -5140,8 +5145,7 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
51405145
}
51415146

51425147
/**
5143-
* Modify the source Observable so that it asynchronously notifies {@link Observer}s on the
5144-
* specified {@link Scheduler}.
5148+
* Move notifications to the specified {@link Scheduler} one `onNext` at a time.
51455149
* <p>
51465150
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
51475151
*
@@ -5152,9 +5156,26 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
51525156
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
51535157
*/
51545158
public final Observable<T> observeOn(Scheduler scheduler) {
5155-
return create(OperationObserveOn.observeOn(this, scheduler));
5159+
return lift(new OperatorObserveOn<T>(scheduler));
51565160
}
51575161

5162+
/**
5163+
* Move notifications to the specified {@link Scheduler} asynchronously with a buffer of the given size.
5164+
* <p>
5165+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
5166+
*
5167+
* @param scheduler
5168+
* the {@link Scheduler} to notify {@link Observer}s on
5169+
* @param bufferSize
5170+
* that will be rounded up to the next power of 2
5171+
* @return the source Observable modified so that its {@link Observer}s are notified on the
5172+
* specified {@link Scheduler}
5173+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
5174+
*/
5175+
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
5176+
return lift(new OperatorObserveOn<T>(scheduler, bufferSize));
5177+
}
5178+
51585179
/**
51595180
* Filters the items emitted by an Observable, only emitting those of the specified type.
51605181
* <p>
@@ -5297,7 +5318,9 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
52975318
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-parallel">RxJava Wiki: parallel()</a>
52985319
*/
52995320
public final <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
5300-
return lift(new OperatorParallel<T, R>(f, Schedulers.computation()));
5321+
// TODO move this back to Schedulers.computation() again once that is properly using eventloops
5322+
// see https://github.com/Netflix/RxJava/issues/713 for why this was changed
5323+
return lift(new OperatorParallel<T, R>(f, Schedulers.newThread()));
53015324
}
53025325

53035326
/**
@@ -6966,10 +6989,9 @@ public void call() {
69666989
}
69676990

69686991
});
6969-
} catch (OnErrorNotImplementedException e) {
6970-
// special handling when onError is not implemented ... we just rethrow
6971-
throw e;
69726992
} catch (Throwable e) {
6993+
// special handling for certain Throwable/Error/Exception types
6994+
Exceptions.throwIfFatal(e);
69736995
// if an unhandled error occurs executing the onSubscribe we will propagate it
69746996
try {
69756997
observer.onError(hook.onSubscribeError(this, e));

rxjava-core/src/main/java/rx/observers/SafeSubscriber.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

2121
import rx.Subscriber;
22-
import rx.Subscription;
23-
import rx.operators.SafeObservableSubscription;
2422
import rx.plugins.RxJavaPlugins;
25-
import rx.subscriptions.Subscriptions;
2623
import rx.util.CompositeException;
24+
import rx.util.Exceptions;
2725
import rx.util.OnErrorNotImplementedException;
2826

2927
/**
@@ -74,6 +72,9 @@ public void onCompleted() {
7472
try {
7573
actual.onCompleted();
7674
} catch (Throwable e) {
75+
// we handle here instead of another method so we don't add stacks to the frame
76+
// which can prevent it from being able to handle StackOverflow
77+
Exceptions.throwIfFatal(e);
7778
// handle errors if the onCompleted implementation fails, not just if the Observable fails
7879
_onError(e);
7980
} finally {
@@ -85,6 +86,9 @@ public void onCompleted() {
8586

8687
@Override
8788
public void onError(Throwable e) {
89+
// we handle here instead of another method so we don't add stacks to the frame
90+
// which can prevent it from being able to handle StackOverflow
91+
Exceptions.throwIfFatal(e);
8892
if (isFinished.compareAndSet(false, true)) {
8993
_onError(e);
9094
}
@@ -97,6 +101,9 @@ public void onNext(T args) {
97101
actual.onNext(args);
98102
}
99103
} catch (Throwable e) {
104+
// we handle here instead of another method so we don't add stacks to the frame
105+
// which can prevent it from being able to handle StackOverflow
106+
Exceptions.throwIfFatal(e);
100107
// handle errors if the onNext implementation fails, not just if the Observable fails
101108
onError(e);
102109
}

0 commit comments

Comments
 (0)