Skip to content

Commit ea4d43a

Browse files
authored
2.x: add onTerminateDetach to Single and Completable (#5624)
* 2.x: add onTerminateDetach to Single and Completable * Improve coverage, fix lack of nulling out the downstream
1 parent 8c5bd1d commit ea4d43a

File tree

9 files changed

+617
-1
lines changed

9 files changed

+617
-1
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,6 +1380,24 @@ public final Completable onErrorResumeNext(final Function<? super Throwable, ? e
13801380
return RxJavaPlugins.onAssembly(new CompletableResumeNext(this, errorMapper));
13811381
}
13821382

1383+
/**
1384+
* Nulls out references to the upstream producer and downstream CompletableObserver if
1385+
* the sequence is terminated or downstream calls dispose().
1386+
* <dl>
1387+
* <dt><b>Scheduler:</b></dt>
1388+
* <dd>{@code onTerminateDetach} does not operate by default on a particular {@link Scheduler}.</dd>
1389+
* </dl>
1390+
* @return a Completable which nulls out references to the upstream producer and downstream CompletableObserver if
1391+
* the sequence is terminated or downstream calls dispose()
1392+
* @since 2.1.5 - experimental
1393+
*/
1394+
@Experimental
1395+
@CheckReturnValue
1396+
@SchedulerSupport(SchedulerSupport.NONE)
1397+
public final Completable onTerminateDetach() {
1398+
return RxJavaPlugins.onAssembly(new CompletableDetach(this));
1399+
}
1400+
13831401
/**
13841402
* Returns a Completable that repeatedly subscribes to this Completable until cancelled.
13851403
* <dl>

src/main/java/io/reactivex/Maybe.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3316,14 +3316,15 @@ public final Maybe<T> onExceptionResumeNext(final MaybeSource<? extends T> next)
33163316
ObjectHelper.requireNonNull(next, "next is null");
33173317
return RxJavaPlugins.onAssembly(new MaybeOnErrorNext<T>(this, Functions.justFunction(next), false));
33183318
}
3319+
33193320
/**
33203321
* Nulls out references to the upstream producer and downstream MaybeObserver if
33213322
* the sequence is terminated or downstream calls dispose().
33223323
* <dl>
33233324
* <dt><b>Scheduler:</b></dt>
33243325
* <dd>{@code onTerminateDetach} does not operate by default on a particular {@link Scheduler}.</dd>
33253326
* </dl>
3326-
* @return a Maybe which out references to the upstream producer and downstream MaybeObserver if
3327+
* @return a Maybe which nulls out references to the upstream producer and downstream MaybeObserver if
33273328
* the sequence is terminated or downstream calls dispose()
33283329
*/
33293330
@CheckReturnValue

src/main/java/io/reactivex/Single.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2468,6 +2468,24 @@ public final Single<T> onErrorResumeNext(
24682468
return RxJavaPlugins.onAssembly(new SingleResumeNext<T>(this, resumeFunctionInCaseOfError));
24692469
}
24702470

2471+
/**
2472+
* Nulls out references to the upstream producer and downstream SingleObserver if
2473+
* the sequence is terminated or downstream calls dispose().
2474+
* <dl>
2475+
* <dt><b>Scheduler:</b></dt>
2476+
* <dd>{@code onTerminateDetach} does not operate by default on a particular {@link Scheduler}.</dd>
2477+
* </dl>
2478+
* @return a Single which nulls out references to the upstream producer and downstream SingleObserver if
2479+
* the sequence is terminated or downstream calls dispose()
2480+
* @since 2.1.5 - experimental
2481+
*/
2482+
@Experimental
2483+
@CheckReturnValue
2484+
@SchedulerSupport(SchedulerSupport.NONE)
2485+
public final Single<T> onTerminateDetach() {
2486+
return RxJavaPlugins.onAssembly(new SingleDetach<T>(this));
2487+
}
2488+
24712489
/**
24722490
* Repeatedly re-subscribes to the current Single and emits each success value.
24732491
* <dl>
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.annotations.Experimental;
18+
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.internal.disposables.DisposableHelper;
20+
21+
/**
22+
* Breaks the references between the upstream and downstream when the Completable terminates.
23+
*
24+
* @since 2.1.5 - experimental
25+
*/
26+
@Experimental
27+
public final class CompletableDetach extends Completable {
28+
29+
final CompletableSource source;
30+
31+
public CompletableDetach(CompletableSource source) {
32+
this.source = source;
33+
}
34+
35+
@Override
36+
protected void subscribeActual(CompletableObserver observer) {
37+
source.subscribe(new DetachCompletableObserver(observer));
38+
}
39+
40+
static final class DetachCompletableObserver implements CompletableObserver, Disposable {
41+
42+
CompletableObserver actual;
43+
44+
Disposable d;
45+
46+
DetachCompletableObserver(CompletableObserver actual) {
47+
this.actual = actual;
48+
}
49+
50+
@Override
51+
public void dispose() {
52+
actual = null;
53+
d.dispose();
54+
d = DisposableHelper.DISPOSED;
55+
}
56+
57+
@Override
58+
public boolean isDisposed() {
59+
return d.isDisposed();
60+
}
61+
62+
@Override
63+
public void onSubscribe(Disposable d) {
64+
if (DisposableHelper.validate(this.d, d)) {
65+
this.d = d;
66+
67+
actual.onSubscribe(this);
68+
}
69+
}
70+
71+
@Override
72+
public void onError(Throwable e) {
73+
d = DisposableHelper.DISPOSED;
74+
CompletableObserver a = actual;
75+
if (a != null) {
76+
actual = null;
77+
a.onError(e);
78+
}
79+
}
80+
81+
@Override
82+
public void onComplete() {
83+
d = DisposableHelper.DISPOSED;
84+
CompletableObserver a = actual;
85+
if (a != null) {
86+
actual = null;
87+
a.onComplete();
88+
}
89+
}
90+
}
91+
}

src/main/java/io/reactivex/internal/operators/maybe/MaybeDetach.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void onSuccess(T value) {
6969
d = DisposableHelper.DISPOSED;
7070
MaybeObserver<? super T> a = actual;
7171
if (a != null) {
72+
actual = null;
7273
a.onSuccess(value);
7374
}
7475
}
@@ -78,6 +79,7 @@ public void onError(Throwable e) {
7879
d = DisposableHelper.DISPOSED;
7980
MaybeObserver<? super T> a = actual;
8081
if (a != null) {
82+
actual = null;
8183
a.onError(e);
8284
}
8385
}
@@ -87,6 +89,7 @@ public void onComplete() {
8789
d = DisposableHelper.DISPOSED;
8890
MaybeObserver<? super T> a = actual;
8991
if (a != null) {
92+
actual = null;
9093
a.onComplete();
9194
}
9295
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.annotations.Experimental;
18+
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.internal.disposables.DisposableHelper;
20+
21+
/**
22+
* Breaks the references between the upstream and downstream when the Maybe terminates.
23+
*
24+
* @param <T> the value type
25+
* @since 2.1.5 - experimental
26+
*/
27+
@Experimental
28+
public final class SingleDetach<T> extends Single<T> {
29+
30+
final SingleSource<T> source;
31+
32+
public SingleDetach(SingleSource<T> source) {
33+
this.source = source;
34+
}
35+
36+
@Override
37+
protected void subscribeActual(SingleObserver<? super T> observer) {
38+
source.subscribe(new DetachSingleObserver<T>(observer));
39+
}
40+
41+
static final class DetachSingleObserver<T> implements SingleObserver<T>, Disposable {
42+
43+
SingleObserver<? super T> actual;
44+
45+
Disposable d;
46+
47+
DetachSingleObserver(SingleObserver<? super T> actual) {
48+
this.actual = actual;
49+
}
50+
51+
@Override
52+
public void dispose() {
53+
actual = null;
54+
d.dispose();
55+
d = DisposableHelper.DISPOSED;
56+
}
57+
58+
@Override
59+
public boolean isDisposed() {
60+
return d.isDisposed();
61+
}
62+
63+
@Override
64+
public void onSubscribe(Disposable d) {
65+
if (DisposableHelper.validate(this.d, d)) {
66+
this.d = d;
67+
68+
actual.onSubscribe(this);
69+
}
70+
}
71+
72+
@Override
73+
public void onSuccess(T value) {
74+
d = DisposableHelper.DISPOSED;
75+
SingleObserver<? super T> a = actual;
76+
if (a != null) {
77+
actual = null;
78+
a.onSuccess(value);
79+
}
80+
}
81+
82+
@Override
83+
public void onError(Throwable e) {
84+
d = DisposableHelper.DISPOSED;
85+
SingleObserver<? super T> a = actual;
86+
if (a != null) {
87+
actual = null;
88+
a.onError(e);
89+
}
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)