Skip to content

Commit 6e266af

Browse files
skryvetsakarnokd
authored andcommitted
Add doOnTerminate to Single/Maybe for consistency (#6379) (#6386)
1 parent 3fbfcc9 commit 6e266af

File tree

6 files changed

+442
-0
lines changed

6 files changed

+442
-0
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2892,6 +2892,33 @@ public final Maybe<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe) {
28922892
));
28932893
}
28942894

2895+
/**
2896+
* Returns a Maybe instance that calls the given onTerminate callback
2897+
* just before this Maybe completes normally or with an exception.
2898+
* <p>
2899+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
2900+
* <p>
2901+
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
2902+
* {@code onError} notification.
2903+
* <dl>
2904+
* <dt><b>Scheduler:</b></dt>
2905+
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
2906+
* </dl>
2907+
* @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError}
2908+
* @return the new Maybe instance
2909+
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
2910+
* @see #doOnTerminate(Action)
2911+
* @since 2.2.7 - experimental
2912+
*/
2913+
@Experimental
2914+
@CheckReturnValue
2915+
@NonNull
2916+
@SchedulerSupport(SchedulerSupport.NONE)
2917+
public final Maybe<T> doOnTerminate(final Action onTerminate) {
2918+
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
2919+
return RxJavaPlugins.onAssembly(new MaybeDoOnTerminate<T>(this, onTerminate));
2920+
}
2921+
28952922
/**
28962923
* Calls the shared consumer with the success value sent via onSuccess for each
28972924
* MaybeObserver that subscribes to the current Maybe.

src/main/java/io/reactivex/Single.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,6 +2495,33 @@ public final Single<T> doOnSubscribe(final Consumer<? super Disposable> onSubscr
24952495
return RxJavaPlugins.onAssembly(new SingleDoOnSubscribe<T>(this, onSubscribe));
24962496
}
24972497

2498+
/**
2499+
* Returns a Single instance that calls the given onTerminate callback
2500+
* just before this Single completes normally or with an exception.
2501+
* <p>
2502+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
2503+
* <p>
2504+
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onComplete} or
2505+
* {@code onError} notification.
2506+
* <dl>
2507+
* <dt><b>Scheduler:</b></dt>
2508+
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
2509+
* </dl>
2510+
* @param onTerminate the action to invoke when the consumer calls {@code onComplete} or {@code onError}
2511+
* @return the new Single instance
2512+
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
2513+
* @see #doOnTerminate(Action)
2514+
* @since 2.2.7 - experimental
2515+
*/
2516+
@Experimental
2517+
@CheckReturnValue
2518+
@NonNull
2519+
@SchedulerSupport(SchedulerSupport.NONE)
2520+
public final Single<T> doOnTerminate(final Action onTerminate) {
2521+
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
2522+
return RxJavaPlugins.onAssembly(new SingleDoOnTerminate<T>(this, onTerminate));
2523+
}
2524+
24982525
/**
24992526
* Calls the shared consumer with the success value sent via onSuccess for each
25002527
* SingleObserver that subscribes to the current Single.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.Maybe;
17+
import io.reactivex.MaybeObserver;
18+
import io.reactivex.MaybeSource;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.CompositeException;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.Action;
23+
24+
public final class MaybeDoOnTerminate<T> extends Maybe<T> {
25+
26+
final MaybeSource<T> source;
27+
28+
final Action onTerminate;
29+
30+
public MaybeDoOnTerminate(MaybeSource<T> source, Action onTerminate) {
31+
this.source = source;
32+
this.onTerminate = onTerminate;
33+
}
34+
35+
@Override
36+
protected void subscribeActual(MaybeObserver<? super T> observer) {
37+
source.subscribe(new DoOnTerminate(observer));
38+
}
39+
40+
final class DoOnTerminate implements MaybeObserver<T> {
41+
final MaybeObserver<? super T> downstream;
42+
43+
DoOnTerminate(MaybeObserver<? super T> observer) {
44+
this.downstream = observer;
45+
}
46+
47+
@Override
48+
public void onSubscribe(Disposable d) {
49+
downstream.onSubscribe(d);
50+
}
51+
52+
@Override
53+
public void onSuccess(T value) {
54+
try {
55+
onTerminate.run();
56+
} catch (Throwable ex) {
57+
Exceptions.throwIfFatal(ex);
58+
downstream.onError(ex);
59+
return;
60+
}
61+
62+
downstream.onSuccess(value);
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
try {
68+
onTerminate.run();
69+
} catch (Throwable ex) {
70+
Exceptions.throwIfFatal(ex);
71+
e = new CompositeException(e, ex);
72+
}
73+
74+
downstream.onError(e);
75+
}
76+
77+
@Override
78+
public void onComplete() {
79+
try {
80+
onTerminate.run();
81+
} catch (Throwable ex) {
82+
Exceptions.throwIfFatal(ex);
83+
downstream.onError(ex);
84+
return;
85+
}
86+
87+
downstream.onComplete();
88+
}
89+
}
90+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.Single;
17+
import io.reactivex.SingleObserver;
18+
import io.reactivex.SingleSource;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.CompositeException;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.Action;
23+
24+
public final class SingleDoOnTerminate<T> extends Single<T> {
25+
26+
final SingleSource<T> source;
27+
28+
final Action onTerminate;
29+
30+
public SingleDoOnTerminate(SingleSource<T> source, Action onTerminate) {
31+
this.source = source;
32+
this.onTerminate = onTerminate;
33+
}
34+
35+
@Override
36+
protected void subscribeActual(final SingleObserver<? super T> observer) {
37+
source.subscribe(new DoOnTerminate(observer));
38+
}
39+
40+
final class DoOnTerminate implements SingleObserver<T> {
41+
42+
final SingleObserver<? super T> downstream;
43+
44+
DoOnTerminate(SingleObserver<? super T> observer) {
45+
this.downstream = observer;
46+
}
47+
48+
@Override
49+
public void onSubscribe(Disposable d) {
50+
downstream.onSubscribe(d);
51+
}
52+
53+
@Override
54+
public void onSuccess(T value) {
55+
try {
56+
onTerminate.run();
57+
} catch (Throwable ex) {
58+
Exceptions.throwIfFatal(ex);
59+
downstream.onError(ex);
60+
return;
61+
}
62+
63+
downstream.onSuccess(value);
64+
}
65+
66+
@Override
67+
public void onError(Throwable e) {
68+
try {
69+
onTerminate.run();
70+
} catch (Throwable ex) {
71+
Exceptions.throwIfFatal(ex);
72+
e = new CompositeException(e, ex);
73+
}
74+
75+
downstream.onError(e);
76+
}
77+
}
78+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.Maybe;
17+
import io.reactivex.TestHelper;
18+
import io.reactivex.exceptions.CompositeException;
19+
import io.reactivex.exceptions.TestException;
20+
import io.reactivex.functions.Action;
21+
import io.reactivex.observers.TestObserver;
22+
import io.reactivex.plugins.RxJavaPlugins;
23+
import io.reactivex.subjects.PublishSubject;
24+
import org.junit.Test;
25+
26+
import java.util.List;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
29+
import static org.junit.Assert.assertTrue;
30+
31+
public class MaybeDoOnTerminateTest {
32+
33+
@Test(expected = NullPointerException.class)
34+
public void doOnTerminate() {
35+
Maybe.just(1).doOnTerminate(null);
36+
}
37+
38+
@Test
39+
public void doOnTerminateSuccess() {
40+
final AtomicBoolean atomicBoolean = new AtomicBoolean();
41+
Maybe.just(1).doOnTerminate(new Action() {
42+
@Override
43+
public void run() {
44+
atomicBoolean.set(true);
45+
}
46+
})
47+
.test()
48+
.assertResult(1);
49+
50+
assertTrue(atomicBoolean.get());
51+
}
52+
53+
@Test
54+
public void doOnTerminateError() {
55+
final AtomicBoolean atomicBoolean = new AtomicBoolean();
56+
Maybe.error(new TestException()).doOnTerminate(new Action() {
57+
@Override
58+
public void run() {
59+
atomicBoolean.set(true);
60+
}
61+
})
62+
.test()
63+
.assertFailure(TestException.class);
64+
65+
assertTrue(atomicBoolean.get());
66+
}
67+
68+
@Test
69+
public void doOnTerminateComplete() {
70+
final AtomicBoolean atomicBoolean = new AtomicBoolean();
71+
Maybe.empty().doOnTerminate(new Action() {
72+
@Override
73+
public void run() {
74+
atomicBoolean.set(true);
75+
}
76+
})
77+
.test()
78+
.assertResult();
79+
80+
assertTrue(atomicBoolean.get());
81+
}
82+
83+
@Test
84+
public void doOnTerminateSuccessCrash() {
85+
Maybe.just(1).doOnTerminate(new Action() {
86+
@Override
87+
public void run() {
88+
throw new TestException();
89+
}
90+
})
91+
.test()
92+
.assertFailure(TestException.class);
93+
}
94+
95+
@Test
96+
public void doOnTerminateErrorCrash() {
97+
TestObserver<Object> to = Maybe.error(new TestException("Outer"))
98+
.doOnTerminate(new Action() {
99+
@Override
100+
public void run() {
101+
throw new TestException("Inner");
102+
}
103+
})
104+
.test()
105+
.assertFailure(CompositeException.class);
106+
107+
List<Throwable> errors = TestHelper.compositeList(to.errors().get(0));
108+
TestHelper.assertError(errors, 0, TestException.class, "Outer");
109+
TestHelper.assertError(errors, 1, TestException.class, "Inner");
110+
}
111+
112+
@Test
113+
public void doOnTerminateCompleteCrash() {
114+
Maybe.empty()
115+
.doOnTerminate(new Action() {
116+
@Override
117+
public void run() {
118+
throw new TestException();
119+
}
120+
})
121+
.test()
122+
.assertFailure(TestException.class);
123+
}
124+
}

0 commit comments

Comments
 (0)