Skip to content

Commit 46b5a0f

Browse files
zsxwingbenjchristensen
authored andcommitted
Fix the bug that Switch doesn't propagate 'unsubscribe'
1 parent 870393a commit 46b5a0f

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

src/main/java/rx/internal/operators/OperatorSwitch.java

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ private static final class SwitchSubscriber<T> extends Subscriber<Observable<? e
6262
volatile boolean infinite = false;
6363

6464
public SwitchSubscriber(Subscriber<? super T> child) {
65+
super(child);
6566
s = new SerializedSubscriber<T>(child);
6667
ssub = new SerialSubscription();
6768
child.add(ssub);

src/test/java/rx/internal/operators/OperatorSwitchTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertTrue;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.anyString;
2021
import static org.mockito.Mockito.inOrder;
@@ -25,6 +26,7 @@
2526

2627
import java.util.Arrays;
2728
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2830

2931
import org.junit.Before;
3032
import org.junit.Test;
@@ -513,4 +515,19 @@ public void onNext(String s) {
513515
testSubscriber.assertNoErrors();
514516
testSubscriber.assertTerminalEvent();
515517
}
518+
519+
@Test
520+
public void testUnsubscribe() {
521+
final AtomicBoolean isUnsubscribed = new AtomicBoolean();
522+
Observable.switchOnNext(
523+
Observable.create(new Observable.OnSubscribe<Observable<Integer>>() {
524+
@Override
525+
public void call(final Subscriber<? super Observable<Integer>> subscriber) {
526+
subscriber.onNext(Observable.just(1));
527+
isUnsubscribed.set(subscriber.isUnsubscribed());
528+
}
529+
})
530+
).take(1).subscribe();
531+
assertTrue("Switch doesn't propagate 'unsubscribe'", isUnsubscribed.get());
532+
}
516533
}

0 commit comments

Comments
 (0)