Skip to content

Commit ffe9968

Browse files
Merge pull request #1593 from benjchristensen/doOnSubscribe
doOnSubscribe
2 parents 573f756 + 36842d2 commit ffe9968

File tree

6 files changed

+353
-141
lines changed

6 files changed

+353
-141
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4804,6 +4804,21 @@ public final void onNext(T args) {
48044804
return lift(new OperatorDoOnEach<T>(observer));
48054805
}
48064806

4807+
/**
4808+
* Modifies the source {@code Observable} so that it invokes the given action when it is subscribed from
4809+
* its subscribers. Each subscription will result in an invocation of the given action except when the
4810+
* source {@code Observable} is reference counted, in which case the source {@code Observable} will invoke
4811+
* the given action for the first subscription.
4812+
*
4813+
*
4814+
* @param unsubscribe The action that gets called when this {@code Observable} is subscribed.
4815+
*
4816+
* @return That modified {@code Observable}
4817+
*/
4818+
public final Observable<T> doOnSubscribe(final Action0 subscribe) {
4819+
return lift(new OperatorDoOnSubscribe<T>(subscribe));
4820+
}
4821+
48074822
/**
48084823
* Modifies the source Observable so that it invokes an action when it calls {@code onCompleted} or
48094824
* {@code onError}.
@@ -4845,6 +4860,26 @@ public final void onNext(T args) {
48454860

48464861
return lift(new OperatorDoOnEach<T>(observer));
48474862
}
4863+
4864+
/**
4865+
* Modifies the source {@code Observable} so that it invokes the given action when it is unsubscribed from
4866+
* its subscribers. Each un-subscription will result in an invocation of the given action except when the
4867+
* source {@code Observable} is reference counted, in which case the source {@code Observable} will invoke
4868+
* the given action for the very last un-subscription.
4869+
* <p>
4870+
* <img width="640" height="310" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnUnsubscribe.png" alt="">
4871+
* <dl>
4872+
* <dt><b>Scheduler:</b></dt>
4873+
* <dd>{@code doOnUnsubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
4874+
* </dl>
4875+
*
4876+
* @param unsubscribe the action that gets called when this {@code Observable} is unsubscribed
4877+
* @return the source {@code Observable} modified so as to call this Action when appropriate
4878+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#doonunsubscribe">RxJava wiki: doOnUnsubscribe</a>
4879+
*/
4880+
public final Observable<T> doOnUnsubscribe(final Action0 unsubscribe) {
4881+
return lift(new OperatorDoOnUnsubscribe<T>(unsubscribe));
4882+
}
48484883

48494884
/**
48504885
* Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
@@ -10358,23 +10393,4 @@ public void call(Subscriber<? super T> observer) {
1035810393
}
1035910394
}
1036010395

10361-
/**
10362-
* Modifies the source {@code Observable} so that it invokes the given action when it is unsubscribed from
10363-
* its subscribers. Each un-subscription will result in an invocation of the given action except when the
10364-
* source {@code Observable} is reference counted, in which case the source {@code Observable} will invoke
10365-
* the given action for the very last un-subscription.
10366-
* <p>
10367-
* <img width="640" height="310" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnUnsubscribe.png" alt="">
10368-
* <dl>
10369-
* <dt><b>Scheduler:</b></dt>
10370-
* <dd>{@code doOnUnsubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
10371-
* </dl>
10372-
*
10373-
* @param unsubscribe the action that gets called when this {@code Observable} is unsubscribed
10374-
* @return the source {@code Observable} modified so as to call this Action when appropriate
10375-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#doonunsubscribe">RxJava wiki: doOnUnsubscribe</a>
10376-
*/
10377-
public final Observable<T> doOnUnsubscribe(final Action0 unsubscribe) {
10378-
return lift(new OperatorDoOnUnsubscribe<T>(unsubscribe));
10379-
}
1038010396
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
import rx.functions.Action0;
21+
22+
/**
23+
* This operator modifies an {@link rx.Observable} so a given action is invoked when the {@link rx.Observable} is subscribed.
24+
* @param <T> The type of the elements in the {@link rx.Observable} that this operator modifies
25+
*/
26+
public class OperatorDoOnSubscribe<T> implements Operator<T, T> {
27+
private final Action0 subscribe;
28+
29+
/**
30+
* Constructs an instance of the operator with the callback that gets invoked when the modified Observable is subscribed
31+
* @param unsubscribe The action that gets invoked when the modified {@link rx.Observable} is subscribed
32+
*/
33+
public OperatorDoOnSubscribe(Action0 subscribe) {
34+
this.subscribe = subscribe;
35+
}
36+
37+
@Override
38+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
39+
subscribe.call();
40+
// Pass through since this operator is for notification only, there is
41+
// no change to the stream whatsoever.
42+
return child;
43+
}
44+
}

rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable;
1918
import rx.Observable.Operator;
2019
import rx.Subscriber;
2120
import rx.functions.Action0;

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 0 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,125 +1126,4 @@ public String call(Integer t1) {
11261126
ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
11271127
}
11281128

1129-
@Test
1130-
public void testDoOnUnsubscribe() throws Exception {
1131-
int subCount = 3;
1132-
final CountDownLatch upperLatch = new CountDownLatch(subCount);
1133-
final CountDownLatch lowerLatch = new CountDownLatch(subCount);
1134-
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
1135-
1136-
final AtomicInteger upperCount = new AtomicInteger();
1137-
final AtomicInteger lowerCount = new AtomicInteger();
1138-
Observable<Long> longs = Observable
1139-
// The stream needs to be infinite to ensure the stream does not terminate
1140-
// before it is unsubscribed
1141-
.interval(50, TimeUnit.MILLISECONDS)
1142-
.doOnUnsubscribe(new Action0() {
1143-
// Test that upper stream will be notified for un-subscription
1144-
// from a child subscriber
1145-
@Override
1146-
public void call() {
1147-
upperLatch.countDown();
1148-
upperCount.incrementAndGet();
1149-
}
1150-
})
1151-
.doOnNext(new Action1<Long>() {
1152-
@Override
1153-
public void call(Long aLong) {
1154-
// Ensure there is at least some onNext events before un-subscription happens
1155-
onNextLatch.countDown();
1156-
}
1157-
})
1158-
.doOnUnsubscribe(new Action0() {
1159-
// Test that lower stream will be notified for a direct un-subscription
1160-
@Override
1161-
public void call() {
1162-
lowerLatch.countDown();
1163-
lowerCount.incrementAndGet();
1164-
}
1165-
});
1166-
1167-
List<Subscription> subscriptions = new ArrayList<Subscription>();
1168-
List<TestSubscriber> subscribers = new ArrayList<TestSubscriber>();
1169-
1170-
for(int i = 0; i < subCount; ++i) {
1171-
TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
1172-
subscriptions.add(longs.subscribe(subscriber));
1173-
subscribers.add(subscriber);
1174-
}
1175-
1176-
onNextLatch.await();
1177-
for(int i = 0; i < subCount; ++i) {
1178-
subscriptions.get(i).unsubscribe();
1179-
// Test that unsubscribe() method is not affected in any way
1180-
subscribers.get(i).assertUnsubscribed();
1181-
}
1182-
1183-
upperLatch.await();
1184-
lowerLatch.await();
1185-
assertEquals(String.format("There should exactly %d un-subscription events for upper stream", subCount), subCount, upperCount.get());
1186-
assertEquals(String.format("There should exactly %d un-subscription events for lower stream", subCount), subCount, lowerCount.get());
1187-
}
1188-
1189-
@Test
1190-
public void testDoOnUnSubscribeWorksWithRefCount() throws Exception {
1191-
int subCount = 3;
1192-
final CountDownLatch upperLatch = new CountDownLatch(1);
1193-
final CountDownLatch lowerLatch = new CountDownLatch(1);
1194-
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
1195-
1196-
final AtomicInteger upperCount = new AtomicInteger();
1197-
final AtomicInteger lowerCount = new AtomicInteger();
1198-
Observable<Long> longs = Observable
1199-
// The stream needs to be infinite to ensure the stream does not terminate
1200-
// before it is unsubscribed
1201-
.interval(50, TimeUnit.MILLISECONDS)
1202-
.doOnUnsubscribe(new Action0() {
1203-
// Test that upper stream will be notified for un-subscription
1204-
@Override
1205-
public void call() {
1206-
upperLatch.countDown();
1207-
upperCount.incrementAndGet();
1208-
}
1209-
})
1210-
.doOnNext(new Action1<Long>() {
1211-
@Override
1212-
public void call(Long aLong) {
1213-
// Ensure there is at least some onNext events before un-subscription happens
1214-
onNextLatch.countDown();
1215-
}
1216-
})
1217-
.doOnUnsubscribe(new Action0() {
1218-
// Test that lower stream will be notified for un-subscription
1219-
@Override
1220-
public void call() {
1221-
lowerLatch.countDown();
1222-
lowerCount.incrementAndGet();
1223-
}
1224-
})
1225-
.publish()
1226-
.refCount();
1227-
1228-
List<Subscription> subscriptions = new ArrayList<Subscription>();
1229-
List<TestSubscriber> subscribers = new ArrayList<TestSubscriber>();
1230-
1231-
for(int i = 0; i < subCount; ++i) {
1232-
TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
1233-
subscriptions.add(longs.subscribe(subscriber));
1234-
subscribers.add(subscriber);
1235-
}
1236-
1237-
onNextLatch.await();
1238-
for(int i = 0; i < subCount; ++i) {
1239-
subscriptions.get(i).unsubscribe();
1240-
// Test that unsubscribe() method is not affected in any way
1241-
subscribers.get(i).assertUnsubscribed();
1242-
}
1243-
1244-
upperLatch.await();
1245-
lowerLatch.await();
1246-
assertEquals("There should exactly 1 un-subscription events for upper stream", 1, upperCount.get());
1247-
assertEquals("There should exactly 1 un-subscription events for lower stream", 1, lowerCount.get());
1248-
1249-
}
12501129
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.assertEquals;
19+
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
23+
import org.junit.Test;
24+
25+
import rx.Observable;
26+
import rx.Observable.OnSubscribe;
27+
import rx.Subscriber;
28+
import rx.functions.Action0;
29+
30+
public class OperatorDoOnSubscribeTest {
31+
32+
@Test
33+
public void testDoOnSubscribe() throws Exception {
34+
final AtomicInteger count = new AtomicInteger();
35+
Observable<Integer> o = Observable.just(1).doOnSubscribe(new Action0() {
36+
37+
@Override
38+
public void call() {
39+
count.incrementAndGet();
40+
}
41+
42+
});
43+
44+
o.subscribe();
45+
o.subscribe();
46+
o.subscribe();
47+
assertEquals(3, count.get());
48+
}
49+
50+
@Test
51+
public void testDoOnSubscribe2() throws Exception {
52+
final AtomicInteger count = new AtomicInteger();
53+
Observable<Integer> o = Observable.just(1).doOnSubscribe(new Action0() {
54+
55+
@Override
56+
public void call() {
57+
count.incrementAndGet();
58+
}
59+
60+
}).take(1).doOnSubscribe(new Action0() {
61+
62+
@Override
63+
public void call() {
64+
count.incrementAndGet();
65+
}
66+
67+
});
68+
69+
o.subscribe();
70+
assertEquals(2, count.get());
71+
}
72+
73+
@Test
74+
public void testDoOnUnSubscribeWorksWithRefCount() throws Exception {
75+
final AtomicInteger onSubscribed = new AtomicInteger();
76+
final AtomicInteger countBefore = new AtomicInteger();
77+
final AtomicInteger countAfter = new AtomicInteger();
78+
final AtomicReference<Subscriber<? super Integer>> sref = new AtomicReference<Subscriber<? super Integer>>();
79+
Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {
80+
81+
@Override
82+
public void call(Subscriber<? super Integer> s) {
83+
onSubscribed.incrementAndGet();
84+
sref.set(s);
85+
}
86+
87+
}).doOnSubscribe(new Action0() {
88+
89+
@Override
90+
public void call() {
91+
countBefore.incrementAndGet();
92+
}
93+
94+
}).publish().refCount().doOnSubscribe(new Action0() {
95+
96+
@Override
97+
public void call() {
98+
countAfter.incrementAndGet();
99+
}
100+
101+
});
102+
103+
o.subscribe();
104+
o.subscribe();
105+
o.subscribe();
106+
assertEquals(1, countBefore.get());
107+
assertEquals(1, onSubscribed.get());
108+
assertEquals(3, countAfter.get());
109+
sref.get().onCompleted();
110+
o.subscribe();
111+
o.subscribe();
112+
o.subscribe();
113+
assertEquals(2, countBefore.get());
114+
assertEquals(2, onSubscribed.get());
115+
assertEquals(6, countAfter.get());
116+
}
117+
118+
}

0 commit comments

Comments
 (0)