Skip to content

Commit def4484

Browse files
Merge pull request #826 from benjchristensen/returned-subscription
Return wrapped Subscription
2 parents a797c56 + 95ce9aa commit def4484

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6900,9 +6900,20 @@ public final Subscription subscribe(Subscriber<? super T> observer) {
69006900
if (isInternalImplementation(observer)) {
69016901
onSubscribeFunction.call(observer);
69026902
} else {
6903-
onSubscribeFunction.call(new SafeSubscriber<T>(observer));
6903+
// assign to `observer` so we return the protected version
6904+
observer = new SafeSubscriber<T>(observer);
6905+
onSubscribeFunction.call(observer);
69046906
}
6905-
return hook.onSubscribeReturn(this, observer);
6907+
final Subscription returnSubscription = hook.onSubscribeReturn(this, observer);
6908+
// we return it inside a Subscription so it can't be cast back to Subscriber
6909+
return Subscriptions.create(new Action0() {
6910+
6911+
@Override
6912+
public void call() {
6913+
returnSubscription.unsubscribe();
6914+
}
6915+
6916+
});
69066917
} catch (OnErrorNotImplementedException e) {
69076918
// special handling when onError is not implemented ... we just rethrow
69086919
throw e;

0 commit comments

Comments
 (0)