Skip to content

Commit 7ce6860

Browse files
Fix BlockingOperator Subscribe Behavior
The blocking operators need to use 'subscribe', not 'unsafeSubscribe' since they need the SafeSubscriber behavior in the final subscribe step they perform. Renamed operators to BlockingOperator* for clarity and to match the Operation->Operator naming change.
1 parent 17b01ab commit 7ce6860

16 files changed

+103
-138
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
import rx.operators.OperationInterval;
7575
import rx.operators.OperationJoin;
7676
import rx.operators.OperationJoinPatterns;
77-
import rx.operators.OperationMaterialize;
77+
import rx.operators.OperatorMaterialize;
7878
import rx.operators.OperationMergeDelayError;
7979
import rx.operators.OperationMergeMaxConcurrent;
8080
import rx.operators.OperationMinMax;
@@ -4915,7 +4915,7 @@ public final <R> Observable<R> mapMany(Func1<? super T, ? extends Observable<R>>
49154915
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229453.aspx">MSDN: Observable.materialize</a>
49164916
*/
49174917
public final Observable<Notification<T>> materialize() {
4918-
return create(OperationMaterialize.materialize(this));
4918+
return lift(new OperatorMaterialize<T>());
49194919
}
49204920

49214921
/**

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222

2323
import rx.Observable;
2424
import rx.Subscriber;
25-
import rx.Subscription;
2625
import rx.functions.Action1;
2726
import rx.functions.Func1;
28-
import rx.observers.SafeSubscriber;
29-
import rx.operators.OperationLatest;
30-
import rx.operators.OperationMostRecent;
31-
import rx.operators.OperationNext;
32-
import rx.operators.OperationToFuture;
33-
import rx.operators.OperationToIterator;
27+
import rx.operators.BlockingOperatorLatest;
28+
import rx.operators.BlockingOperatorMostRecent;
29+
import rx.operators.BlockingOperatorNext;
30+
import rx.operators.BlockingOperatorToFuture;
31+
import rx.operators.BlockingOperatorToIterator;
3432

3533
/**
3634
* An extension of {@link Observable} that provides blocking operators.
@@ -64,17 +62,6 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
6462
return new BlockingObservable<T>(o);
6563
}
6664

67-
/**
68-
* Used for protecting against errors being thrown from {@link Subscriber} implementations and ensuring onNext/onError/onCompleted contract
69-
* compliance.
70-
* <p>
71-
* See https://github.com/Netflix/RxJava/issues/216 for discussion on
72-
* "Guideline 6.4: Protect calls to user code from within an operator"
73-
*/
74-
private Subscription protectivelyWrapAndSubscribe(Subscriber<? super T> observer) {
75-
return o.unsafeSubscribe(new SafeSubscriber<T>(observer));
76-
}
77-
7865
/**
7966
* Invoke a method on each item emitted by the {@link Observable}; block
8067
* until the Observable completes.
@@ -97,12 +84,10 @@ public void forEach(final Action1<? super T> onNext) {
9784
final AtomicReference<Throwable> exceptionFromOnError = new AtomicReference<Throwable>();
9885

9986
/**
100-
* Wrapping since raw functions provided by the user are being invoked.
101-
*
102-
* See https://github.com/Netflix/RxJava/issues/216 for discussion on
103-
* "Guideline 6.4: Protect calls to user code from within an operator"
87+
* Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
88+
* as this is the final subscribe in the chain.
10489
*/
105-
protectivelyWrapAndSubscribe(new Subscriber<T>() {
90+
o.subscribe(new Subscriber<T>() {
10691
@Override
10792
public void onCompleted() {
10893
latch.countDown();
@@ -158,7 +143,7 @@ public void onNext(T args) {
158143
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: getIterator()</a>
159144
*/
160145
public Iterator<T> getIterator() {
161-
return OperationToIterator.toIterator(o);
146+
return BlockingOperatorToIterator.toIterator(o);
162147
}
163148

164149
/**
@@ -311,7 +296,7 @@ public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
311296
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229751.aspx">MSDN: Observable.MostRecent</a>
312297
*/
313298
public Iterable<T> mostRecent(T initialValue) {
314-
return OperationMostRecent.mostRecent(o, initialValue);
299+
return BlockingOperatorMostRecent.mostRecent(o, initialValue);
315300
}
316301

317302
/**
@@ -324,7 +309,7 @@ public Iterable<T> mostRecent(T initialValue) {
324309
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211897.aspx">MSDN: Observable.Next</a>
325310
*/
326311
public Iterable<T> next() {
327-
return OperationNext.next(o);
312+
return BlockingOperatorNext.next(o);
328313
}
329314

330315
/**
@@ -344,7 +329,7 @@ public Iterable<T> next() {
344329
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212115.aspx">MSDN: Observable.Latest</a>
345330
*/
346331
public Iterable<T> latest() {
347-
return OperationLatest.latest(o);
332+
return BlockingOperatorLatest.latest(o);
348333
}
349334

350335
/**
@@ -441,7 +426,7 @@ public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
441426
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: toFuture()</a>
442427
*/
443428
public Future<T> toFuture() {
444-
return OperationToFuture.toFuture(o);
429+
return BlockingOperatorToFuture.toFuture(o);
445430
}
446431

447432
/**

rxjava-core/src/main/java/rx/operators/OperationLatest.java renamed to rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
* If the source works faster than the iterator, values may be skipped, but
3131
* not the onError or onCompleted events.
3232
*/
33-
public final class OperationLatest {
33+
public final class BlockingOperatorLatest {
3434
/** Utility class. */
35-
private OperationLatest() {
35+
private BlockingOperatorLatest() {
3636
throw new IllegalStateException("No instances!");
3737
}
3838

@@ -41,7 +41,7 @@ public static <T> Iterable<T> latest(final Observable<? extends T> source) {
4141
@Override
4242
public Iterator<T> iterator() {
4343
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
44-
source.materialize().unsafeSubscribe(lio);
44+
source.materialize().subscribe(lio);
4545
return lio;
4646
}
4747
};

rxjava-core/src/main/java/rx/operators/OperationMostRecent.java renamed to rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* <p>
3030
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.mostRecent.png">
3131
*/
32-
public final class OperationMostRecent {
32+
public final class BlockingOperatorMostRecent {
3333

3434
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {
3535

@@ -39,7 +39,11 @@ public Iterator<T> iterator() {
3939
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
4040
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);
4141

42-
source.unsafeSubscribe(mostRecentObserver);
42+
/**
43+
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
44+
* since it is for BlockingObservable.
45+
*/
46+
source.subscribe(mostRecentObserver);
4347

4448
return nextIterator;
4549
}

rxjava-core/src/main/java/rx/operators/OperationNext.java renamed to rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* <p>
3232
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.next.png">
3333
*/
34-
public final class OperationNext {
34+
public final class BlockingOperatorNext {
3535

3636
public static <T> Iterable<T> next(final Observable<? extends T> items) {
3737
return new Iterable<T>() {
@@ -40,7 +40,7 @@ public Iterator<T> iterator() {
4040
NextObserver<T> nextObserver = new NextObserver<T>();
4141
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);
4242

43-
items.materialize().unsafeSubscribe(nextObserver);
43+
items.materialize().subscribe(nextObserver);
4444

4545
return nextIterator;
4646
}

rxjava-core/src/main/java/rx/operators/OperationToFuture.java renamed to rxjava-core/src/main/java/rx/operators/BlockingOperatorToFuture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
* The toFuture operation throws an exception if the Observable emits more than one item. If the
3636
* Observable may emit more than item, use <code>toList().toFuture()</code>.
3737
*/
38-
public class OperationToFuture {
38+
public class BlockingOperatorToFuture {
3939

4040
/**
4141
* Returns a Future that expects a single item from the observable.
@@ -52,7 +52,7 @@ public static <T> Future<T> toFuture(Observable<? extends T> that) {
5252
final AtomicReference<T> value = new AtomicReference<T>();
5353
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
5454

55-
final Subscription s = that.unsafeSubscribe(new Subscriber<T>() {
55+
final Subscription s = that.subscribe(new Subscriber<T>() {
5656

5757
@Override
5858
public void onCompleted() {

rxjava-core/src/main/java/rx/operators/OperationToIterator.java renamed to rxjava-core/src/main/java/rx/operators/BlockingOperatorToIterator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*
3434
* @see <a href="https://github.com/Netflix/RxJava/issues/50">Issue #50</a>
3535
*/
36-
public class OperationToIterator {
36+
public class BlockingOperatorToIterator {
3737

3838
/**
3939
* Returns an iterator that iterates all values of the observable.
@@ -45,7 +45,8 @@ public class OperationToIterator {
4545
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
4646
final BlockingQueue<Notification<? extends T>> notifications = new LinkedBlockingQueue<Notification<? extends T>>();
4747

48-
source.materialize().unsafeSubscribe(new Subscriber<Notification<? extends T>>() {
48+
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
49+
source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
4950
@Override
5051
public void onCompleted() {
5152
// ignore

rxjava-core/src/main/java/rx/operators/OperationDematerialize.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import rx.Subscription;
2424

2525
/**
26-
* Reverses the effect of {@link OperationMaterialize} by transforming the Notification objects
26+
* Reverses the effect of {@link OperatorMaterialize} by transforming the Notification objects
2727
* emitted by a source Observable into the items or notifications they represent.
2828
* <p>
2929
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/dematerialize.png">

rxjava-core/src/main/java/rx/operators/OperationMaterialize.java

Lines changed: 0 additions & 81 deletions
This file was deleted.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Notification;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
22+
/**
23+
* Turns all of the notifications from an Observable into <code>onNext</code> emissions, and marks
24+
* them with their original notification types within {@link Notification} objects.
25+
* <p>
26+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/materialize.png">
27+
* <p>
28+
* See <a href="http://msdn.microsoft.com/en-us/library/hh229453(v=VS.103).aspx">here</a> for the
29+
* Microsoft Rx equivalent.
30+
*/
31+
public final class OperatorMaterialize<T> implements Operator<Notification<T>, T> {
32+
33+
@Override
34+
public Subscriber<? super T> call(final Subscriber<? super Notification<T>> child) {
35+
return new Subscriber<T>(child) {
36+
37+
@Override
38+
public void onCompleted() {
39+
child.onNext(Notification.<T> createOnCompleted());
40+
child.onCompleted();
41+
}
42+
43+
@Override
44+
public void onError(Throwable e) {
45+
child.onNext(Notification.<T> createOnError(e));
46+
child.onCompleted();
47+
}
48+
49+
@Override
50+
public void onNext(T t) {
51+
child.onNext(Notification.<T> createOnNext(t));
52+
}
53+
54+
};
55+
}
56+
}

0 commit comments

Comments
 (0)