Skip to content

Commit 44b015f

Browse files
Merge pull request #904 from benjchristensen/merge-subscriptions
Merge: Unsubscribe Completed Inner Observables
2 parents fe7e449 + 94c8b6b commit 44b015f

File tree

3 files changed

+121
-2
lines changed

3 files changed

+121
-2
lines changed

rxjava-core/src/main/java/rx/observers/TestSubscriber.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ public void assertTerminalEvent() {
8585
testObserver.assertTerminalEvent();
8686
}
8787

88+
public void assertUnsubscribed() {
89+
if (!isUnsubscribed()) {
90+
throw new AssertionError("Not unsubscribed.");
91+
}
92+
}
93+
8894
public void awaitTerminalEvent() {
8995
try {
9096
latch.await();

rxjava-core/src/main/java/rx/operators/OperatorMerge.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.Observable.Operator;
2222
import rx.Subscriber;
2323
import rx.observers.SynchronizedSubscriber;
24+
import rx.subscriptions.CompositeSubscription;
2425

2526
/**
2627
* Flattens a list of Observables into one Observable sequence, without any transformation.
@@ -36,6 +37,9 @@ public final class OperatorMerge<T> implements Operator<T, Observable<? extends
3637
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> outerOperation) {
3738

3839
final Subscriber<T> o = new SynchronizedSubscriber<T>(outerOperation);
40+
final CompositeSubscription childrenSubscriptions = new CompositeSubscription();
41+
outerOperation.add(childrenSubscriptions);
42+
3943
return new Subscriber<Observable<? extends T>>(outerOperation) {
4044

4145
private volatile boolean completed = false;
@@ -57,32 +61,41 @@ public void onError(Throwable e) {
5761
@Override
5862
public void onNext(Observable<? extends T> innerObservable) {
5963
runningCount.incrementAndGet();
60-
innerObservable.subscribe(new InnerObserver());
64+
Subscriber<T> i = new InnerObserver();
65+
childrenSubscriptions.add(i);
66+
innerObservable.subscribe(i);
6167
}
6268

6369
final class InnerObserver extends Subscriber<T> {
6470

6571
public InnerObserver() {
66-
super(o);
6772
}
6873

6974
@Override
7075
public void onCompleted() {
7176
if (runningCount.decrementAndGet() == 0 && completed) {
7277
o.onCompleted();
7378
}
79+
cleanup();
7480
}
7581

7682
@Override
7783
public void onError(Throwable e) {
7884
o.onError(e);
85+
cleanup();
7986
}
8087

8188
@Override
8289
public void onNext(T a) {
8390
o.onNext(a);
8491
}
8592

93+
private void cleanup() {
94+
// remove subscription onCompletion so it cleans up immediately and doesn't memory leak
95+
// see https://github.com/Netflix/RxJava/issues/897
96+
childrenSubscriptions.remove(this);
97+
}
98+
8699
};
87100

88101
};

rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.mockito.Mockito.*;
2121

2222
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.Collections;
2325
import java.util.List;
2426
import java.util.concurrent.CountDownLatch;
2527
import java.util.concurrent.TimeUnit;
@@ -32,11 +34,16 @@
3234
import org.mockito.MockitoAnnotations;
3335

3436
import rx.Observable;
37+
import rx.Observable.OnSubscribe;
3538
import rx.Observer;
39+
import rx.Scheduler;
3640
import rx.Subscriber;
3741
import rx.Subscription;
3842
import rx.functions.Action0;
3943
import rx.functions.Action1;
44+
import rx.observers.TestSubscriber;
45+
import rx.schedulers.Schedulers;
46+
import rx.schedulers.TestScheduler;
4047
import rx.subscriptions.Subscriptions;
4148

4249
public class OperatorMergeTest {
@@ -372,4 +379,97 @@ public Subscription onSubscribe(Observer<? super String> observer) {
372379
}
373380
}
374381

382+
@Test
383+
public void testUnsubscribeAsObservablesComplete() {
384+
TestScheduler scheduler1 = Schedulers.test();
385+
AtomicBoolean os1 = new AtomicBoolean(false);
386+
Observable<Long> o1 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler1, os1);
387+
388+
TestScheduler scheduler2 = Schedulers.test();
389+
AtomicBoolean os2 = new AtomicBoolean(false);
390+
Observable<Long> o2 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler2, os2);
391+
392+
TestSubscriber<Long> ts = new TestSubscriber<Long>();
393+
Observable.merge(o1, o2).subscribe(ts);
394+
395+
// we haven't incremented time so nothing should be received yet
396+
ts.assertReceivedOnNext(Collections.<Long> emptyList());
397+
398+
scheduler1.advanceTimeBy(3, TimeUnit.SECONDS);
399+
scheduler2.advanceTimeBy(2, TimeUnit.SECONDS);
400+
401+
ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L));
402+
// not unsubscribed yet
403+
assertFalse(os1.get());
404+
assertFalse(os2.get());
405+
406+
// advance to the end at which point it should complete
407+
scheduler1.advanceTimeBy(3, TimeUnit.SECONDS);
408+
409+
ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L, 3L, 4L));
410+
assertTrue(os1.get());
411+
assertFalse(os2.get());
412+
413+
// both should be completed now
414+
scheduler2.advanceTimeBy(3, TimeUnit.SECONDS);
415+
416+
ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L, 3L, 4L, 2L, 3L, 4L));
417+
assertTrue(os1.get());
418+
assertTrue(os2.get());
419+
420+
ts.assertTerminalEvent();
421+
}
422+
423+
@Test
424+
public void testEarlyUnsubscribe() {
425+
TestScheduler scheduler1 = Schedulers.test();
426+
AtomicBoolean os1 = new AtomicBoolean(false);
427+
Observable<Long> o1 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler1, os1);
428+
429+
TestScheduler scheduler2 = Schedulers.test();
430+
AtomicBoolean os2 = new AtomicBoolean(false);
431+
Observable<Long> o2 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler2, os2);
432+
433+
TestSubscriber<Long> ts = new TestSubscriber<Long>();
434+
Subscription s = Observable.merge(o1, o2).subscribe(ts);
435+
436+
// we haven't incremented time so nothing should be received yet
437+
ts.assertReceivedOnNext(Collections.<Long> emptyList());
438+
439+
scheduler1.advanceTimeBy(3, TimeUnit.SECONDS);
440+
scheduler2.advanceTimeBy(2, TimeUnit.SECONDS);
441+
442+
ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L));
443+
// not unsubscribed yet
444+
assertFalse(os1.get());
445+
assertFalse(os2.get());
446+
447+
// early unsubscribe
448+
s.unsubscribe();
449+
450+
assertTrue(os1.get());
451+
assertTrue(os2.get());
452+
453+
ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L));
454+
ts.assertUnsubscribed();
455+
}
456+
457+
private Observable<Long> createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(final Scheduler scheduler, final AtomicBoolean unsubscribed) {
458+
return Observable.create(new OnSubscribe<Long>() {
459+
460+
@Override
461+
public void call(Subscriber<? super Long> s) {
462+
s.add(Subscriptions.create(new Action0() {
463+
464+
@Override
465+
public void call() {
466+
unsubscribed.set(true);
467+
}
468+
469+
}));
470+
Observable.interval(1, TimeUnit.SECONDS, scheduler).take(5).subscribe(s);
471+
}
472+
});
473+
}
474+
375475
}

0 commit comments

Comments
 (0)