Skip to content

Commit db50430

Browse files
Merge pull request #1589 from g9yuayon/operatorOnUnsubscribed
Added doOnSubscribe() to Observable
2 parents df330dc + 8dbff9c commit db50430

File tree

3 files changed

+185
-0
lines changed

3 files changed

+185
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10357,4 +10357,19 @@ public void call(Subscriber<? super T> observer) {
1035710357
});
1035810358
}
1035910359
}
10360+
10361+
/**
10362+
* Modifies the source {@code Observable} so that it invokes the given action when it is unsubscribed from
10363+
* its subscribers. Each un-subscription will result in an invocation of the given action except when the
10364+
* source {@code Observable} is reference counted, in which case the source {@code Observable} will invoke
10365+
* the given action for the very last un-subscription.
10366+
*
10367+
*
10368+
* @param unsubscribe The action that gets called when this {@code Observable} is unsubscribed.
10369+
*
10370+
* @return That modified {@code Observable}
10371+
*/
10372+
public final Observable<T> doOnUnsubscribe(final Action0 unsubscribe) {
10373+
return lift(new OperatorDoOnUnsubscribe<T>(unsubscribe));
10374+
}
1036010375
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
import rx.functions.Action0;
22+
import rx.subscriptions.Subscriptions;
23+
24+
/**
25+
* This operator modifies an {@link rx.Observable} so a given action is invoked when the {@link rx.Observable} is unsubscribed.
26+
* @param <T> The type of the elements in the {@link rx.Observable} that this operator modifies
27+
*/
28+
public class OperatorDoOnUnsubscribe<T> implements Operator<T, T> {
29+
private final Action0 unsubscribe;
30+
31+
/**
32+
* Constructs an instance of the operator with the callback that gets invoked when the modified Observable is unsubscribed
33+
* @param unsubscribe The action that gets invoked when the modified {@link rx.Observable} is unsubscribed
34+
*/
35+
public OperatorDoOnUnsubscribe(Action0 unsubscribe) {
36+
this.unsubscribe = unsubscribe;
37+
}
38+
39+
@Override
40+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
41+
child.add(Subscriptions.create(unsubscribe));
42+
43+
// Pass through since this operator is for notification only, there is
44+
// no change to the stream whatsoever.
45+
return child;
46+
}
47+
}

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import rx.Observable.OnSubscribe;
4949
import rx.Observable.Transformer;
5050
import rx.exceptions.OnErrorNotImplementedException;
51+
import rx.functions.Action0;
5152
import rx.functions.Action1;
5253
import rx.functions.Action2;
5354
import rx.functions.Func1;
@@ -1124,4 +1125,126 @@ public String call(Integer t1) {
11241125
ts.assertNoErrors();
11251126
ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
11261127
}
1128+
1129+
@Test
1130+
public void testDoOnUnsubscribe() throws Exception {
1131+
int subCount = 3;
1132+
final CountDownLatch upperLatch = new CountDownLatch(subCount);
1133+
final CountDownLatch lowerLatch = new CountDownLatch(subCount);
1134+
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
1135+
1136+
final AtomicInteger upperCount = new AtomicInteger();
1137+
final AtomicInteger lowerCount = new AtomicInteger();
1138+
Observable<Long> longs = Observable
1139+
// The stream needs to be infinite to ensure the stream does not terminate
1140+
// before it is unsubscribed
1141+
.interval(50, TimeUnit.MILLISECONDS)
1142+
.doOnUnsubscribe(new Action0() {
1143+
// Test that upper stream will be notified for un-subscription
1144+
// from a child subscriber
1145+
@Override
1146+
public void call() {
1147+
upperLatch.countDown();
1148+
upperCount.incrementAndGet();
1149+
}
1150+
})
1151+
.doOnNext(new Action1<Long>() {
1152+
@Override
1153+
public void call(Long aLong) {
1154+
// Ensure there is at least some onNext events before un-subscription happens
1155+
onNextLatch.countDown();
1156+
}
1157+
})
1158+
.doOnUnsubscribe(new Action0() {
1159+
// Test that lower stream will be notified for a direct un-subscription
1160+
@Override
1161+
public void call() {
1162+
lowerLatch.countDown();
1163+
lowerCount.incrementAndGet();
1164+
}
1165+
});
1166+
1167+
List<Subscription> subscriptions = new ArrayList<Subscription>();
1168+
List<TestSubscriber> subscribers = new ArrayList<TestSubscriber>();
1169+
1170+
for(int i = 0; i < subCount; ++i) {
1171+
TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
1172+
subscriptions.add(longs.subscribe(subscriber));
1173+
subscribers.add(subscriber);
1174+
}
1175+
1176+
onNextLatch.await();
1177+
for(int i = 0; i < subCount; ++i) {
1178+
subscriptions.get(i).unsubscribe();
1179+
// Test that unsubscribe() method is not affected in any way
1180+
subscribers.get(i).assertUnsubscribed();
1181+
}
1182+
1183+
upperLatch.await();
1184+
lowerLatch.await();
1185+
assertEquals(String.format("There should exactly %d un-subscription events for upper stream", subCount), subCount, upperCount.get());
1186+
assertEquals(String.format("There should exactly %d un-subscription events for lower stream", subCount), subCount, lowerCount.get());
1187+
}
1188+
1189+
@Test
1190+
public void testDoOnUnSubscribeWorksWithRefCount() throws Exception {
1191+
int subCount = 3;
1192+
final CountDownLatch upperLatch = new CountDownLatch(1);
1193+
final CountDownLatch lowerLatch = new CountDownLatch(1);
1194+
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
1195+
1196+
final AtomicInteger upperCount = new AtomicInteger();
1197+
final AtomicInteger lowerCount = new AtomicInteger();
1198+
Observable<Long> longs = Observable
1199+
// The stream needs to be infinite to ensure the stream does not terminate
1200+
// before it is unsubscribed
1201+
.interval(50, TimeUnit.MILLISECONDS)
1202+
.doOnUnsubscribe(new Action0() {
1203+
// Test that upper stream will be notified for un-subscription
1204+
@Override
1205+
public void call() {
1206+
upperLatch.countDown();
1207+
upperCount.incrementAndGet();
1208+
}
1209+
})
1210+
.doOnNext(new Action1<Long>() {
1211+
@Override
1212+
public void call(Long aLong) {
1213+
// Ensure there is at least some onNext events before un-subscription happens
1214+
onNextLatch.countDown();
1215+
}
1216+
})
1217+
.doOnUnsubscribe(new Action0() {
1218+
// Test that lower stream will be notified for un-subscription
1219+
@Override
1220+
public void call() {
1221+
lowerLatch.countDown();
1222+
lowerCount.incrementAndGet();
1223+
}
1224+
})
1225+
.publish()
1226+
.refCount();
1227+
1228+
List<Subscription> subscriptions = new ArrayList<Subscription>();
1229+
List<TestSubscriber> subscribers = new ArrayList<TestSubscriber>();
1230+
1231+
for(int i = 0; i < subCount; ++i) {
1232+
TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
1233+
subscriptions.add(longs.subscribe(subscriber));
1234+
subscribers.add(subscriber);
1235+
}
1236+
1237+
onNextLatch.await();
1238+
for(int i = 0; i < subCount; ++i) {
1239+
subscriptions.get(i).unsubscribe();
1240+
// Test that unsubscribe() method is not affected in any way
1241+
subscribers.get(i).assertUnsubscribed();
1242+
}
1243+
1244+
upperLatch.await();
1245+
lowerLatch.await();
1246+
assertEquals("There should exactly 1 un-subscription events for upper stream", 1, upperCount.get());
1247+
assertEquals("There should exactly 1 un-subscription events for lower stream", 1, lowerCount.get());
1248+
1249+
}
11271250
}

0 commit comments

Comments
 (0)