Skip to content

Commit b126c6c

Browse files
committed
1.x: fix delaySubscription(Observable) unsubscription before triggered (#3845)
1 parent 0fa142f commit b126c6c

File tree

3 files changed

+118
-9
lines changed

3 files changed

+118
-9
lines changed

src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import rx.Observable.OnSubscribe;
2121
import rx.observers.Subscribers;
2222
import rx.plugins.*;
23-
import rx.subscriptions.SerialSubscription;
23+
import rx.subscriptions.*;
2424

2525
/**
2626
* Delays the subscription to the main source until the other
@@ -39,9 +39,12 @@ public OnSubscribeDelaySubscriptionOther(Observable<? extends T> main, Observabl
3939

4040
@Override
4141
public void call(Subscriber<? super T> t) {
42+
final SerialSubscription serial = new SerialSubscription();
43+
44+
t.add(serial);
45+
4246
final Subscriber<T> child = Subscribers.wrap(t);
4347

44-
final SerialSubscription serial = new SerialSubscription();
4548

4649
Subscriber<U> otherSubscriber = new Subscriber<U>() {
4750
boolean done;
@@ -66,7 +69,7 @@ public void onCompleted() {
6669
return;
6770
}
6871
done = true;
69-
serial.set(child);
72+
serial.set(Subscriptions.unsubscribed());
7073

7174
main.unsafeSubscribe(child);
7275
}

src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package rx.internal.operators;
1818

19-
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.concurrent.atomic.*;
2020

2121
import org.junit.*;
2222

@@ -243,4 +243,69 @@ public void call() {
243243
ts.assertNoErrors();
244244
ts.assertCompleted();
245245
}
246+
247+
@Test
248+
public void unsubscriptionPropagatesBeforeSubscribe() {
249+
PublishSubject<Integer> source = PublishSubject.create();
250+
PublishSubject<Integer> other = PublishSubject.create();
251+
252+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
253+
254+
source.delaySubscription(other).subscribe(ts);
255+
256+
Assert.assertFalse("source subscribed?", source.hasObservers());
257+
Assert.assertTrue("other not subscribed?", other.hasObservers());
258+
259+
ts.unsubscribe();
260+
261+
Assert.assertFalse("source subscribed?", source.hasObservers());
262+
Assert.assertFalse("other still subscribed?", other.hasObservers());
263+
}
264+
265+
@Test
266+
public void unsubscriptionPropagatesAfterSubscribe() {
267+
PublishSubject<Integer> source = PublishSubject.create();
268+
PublishSubject<Integer> other = PublishSubject.create();
269+
270+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
271+
272+
source.delaySubscription(other).subscribe(ts);
273+
274+
Assert.assertFalse("source subscribed?", source.hasObservers());
275+
Assert.assertTrue("other not subscribed?", other.hasObservers());
276+
277+
other.onCompleted();
278+
279+
Assert.assertTrue("source not subscribed?", source.hasObservers());
280+
Assert.assertFalse("other still subscribed?", other.hasObservers());
281+
282+
ts.unsubscribe();
283+
284+
Assert.assertFalse("source subscribed?", source.hasObservers());
285+
Assert.assertFalse("other still subscribed?", other.hasObservers());
286+
}
287+
288+
@Test
289+
public void delayAndTakeUntilNeverSubscribeToSource() {
290+
PublishSubject<Integer> delayUntil = PublishSubject.create();
291+
PublishSubject<Integer> interrupt = PublishSubject.create();
292+
final AtomicBoolean subscribed = new AtomicBoolean(false);
293+
294+
Observable.just(1)
295+
.doOnSubscribe(new Action0() {
296+
@Override
297+
public void call() {
298+
subscribed.set(true);
299+
}
300+
})
301+
.delaySubscription(delayUntil)
302+
.takeUntil(interrupt)
303+
.subscribe();
304+
305+
interrupt.onNext(9000);
306+
delayUntil.onNext(1);
307+
308+
Assert.assertFalse(subscribed.get());
309+
}
310+
246311
}

src/test/java/rx/internal/operators/OperatorDelayTest.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import java.util.Arrays;
3131
import java.util.List;
3232
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334

34-
import org.junit.Before;
35-
import org.junit.Test;
35+
import org.junit.*;
3636
import org.mockito.InOrder;
3737
import org.mockito.Mock;
3838

@@ -41,9 +41,7 @@
4141
import rx.Observer;
4242
import rx.Subscription;
4343
import rx.exceptions.TestException;
44-
import rx.functions.Action1;
45-
import rx.functions.Func0;
46-
import rx.functions.Func1;
44+
import rx.functions.*;
4745
import rx.internal.util.RxRingBuffer;
4846
import rx.observers.TestObserver;
4947
import rx.observers.TestSubscriber;
@@ -821,4 +819,47 @@ public void testErrorRunsBeforeOnNext() {
821819
ts.assertError(TestException.class);
822820
ts.assertNotCompleted();
823821
}
822+
823+
@Test
824+
public void delaySubscriptionCancelBeforeTime() {
825+
PublishSubject<Integer> source = PublishSubject.create();
826+
827+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
828+
829+
source.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler).subscribe(ts);
830+
831+
Assert.assertFalse("source subscribed?", source.hasObservers());
832+
833+
ts.unsubscribe();
834+
835+
Assert.assertFalse("source subscribed?", source.hasObservers());
836+
837+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
838+
839+
Assert.assertFalse("source subscribed?", source.hasObservers());
840+
}
841+
842+
@Test
843+
public void delayAndTakeUntilNeverSubscribeToSource() {
844+
PublishSubject<Integer> interrupt = PublishSubject.create();
845+
final AtomicBoolean subscribed = new AtomicBoolean(false);
846+
TestScheduler testScheduler = new TestScheduler();
847+
848+
Observable.just(1)
849+
.doOnSubscribe(new Action0() {
850+
@Override
851+
public void call() {
852+
subscribed.set(true);
853+
}
854+
})
855+
.delaySubscription(1, TimeUnit.SECONDS, testScheduler)
856+
.takeUntil(interrupt)
857+
.subscribe();
858+
859+
interrupt.onNext(9000);
860+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
861+
862+
Assert.assertFalse(subscribed.get());
863+
}
864+
824865
}

0 commit comments

Comments
 (0)