Skip to content

Commit ef1c509

Browse files
committed
Merge pull request #3585 from akarnokd/UsingDisposeFix
1.x: fix Completable.using not disposing the resource if the factory crashes during the subscription phase.
2 parents b087eff + d1af6be commit ef1c509

File tree

2 files changed

+157
-2
lines changed

2 files changed

+157
-2
lines changed

src/main/java/rx/Completable.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package rx;
1818

19-
import java.util.Iterator;
19+
import java.util.*;
2020
import java.util.concurrent.*;
2121
import java.util.concurrent.atomic.AtomicBoolean;
2222

2323
import rx.Observable.OnSubscribe;
2424
import rx.annotations.Experimental;
25-
import rx.exceptions.Exceptions;
25+
import rx.exceptions.*;
2626
import rx.functions.*;
2727
import rx.internal.operators.*;
2828
import rx.internal.util.*;
@@ -864,12 +864,33 @@ public void call(final CompletableSubscriber s) {
864864
try {
865865
cs = completableFunc1.call(resource);
866866
} catch (Throwable e) {
867+
try {
868+
disposer.call(resource);
869+
} catch (Throwable ex) {
870+
Exceptions.throwIfFatal(e);
871+
Exceptions.throwIfFatal(ex);
872+
873+
s.onSubscribe(Subscriptions.unsubscribed());
874+
s.onError(new CompositeException(Arrays.asList(e, ex)));
875+
return;
876+
}
877+
Exceptions.throwIfFatal(e);
878+
867879
s.onSubscribe(Subscriptions.unsubscribed());
868880
s.onError(e);
869881
return;
870882
}
871883

872884
if (cs == null) {
885+
try {
886+
disposer.call(resource);
887+
} catch (Throwable ex) {
888+
Exceptions.throwIfFatal(ex);
889+
890+
s.onSubscribe(Subscriptions.unsubscribed());
891+
s.onError(new CompositeException(Arrays.asList(new NullPointerException("The completable supplied is null"), ex)));
892+
return;
893+
}
873894
s.onSubscribe(Subscriptions.unsubscribed());
874895
s.onError(new NullPointerException("The completable supplied is null"));
875896
return;

src/test/java/rx/CompletableTest.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import rx.subjects.PublishSubject;
3333
import rx.subscriptions.*;
3434

35+
import static org.mockito.Mockito.*;
36+
import static org.junit.Assert.*;
37+
3538
/**
3639
* Test Completable methods and operators.
3740
*/
@@ -3469,4 +3472,135 @@ public void endWithFlowableError() {
34693472
ts.assertError(TestException.class);
34703473
ts.assertNotCompleted();
34713474
}
3475+
3476+
@Test
3477+
public void usingFactoryThrows() {
3478+
@SuppressWarnings("unchecked")
3479+
Action1<Integer> onDispose = mock(Action1.class);
3480+
3481+
TestSubscriber<Integer> ts = TestSubscriber.create();
3482+
3483+
Completable.using(new Func0<Integer>() {
3484+
@Override
3485+
public Integer call() {
3486+
return 1;
3487+
}
3488+
},
3489+
new Func1<Integer, Completable>() {
3490+
@Override
3491+
public Completable call(Integer t) {
3492+
throw new TestException();
3493+
}
3494+
}, onDispose).subscribe(ts);
3495+
3496+
verify(onDispose).call(1);
3497+
3498+
ts.assertNoValues();
3499+
ts.assertNotCompleted();
3500+
ts.assertError(TestException.class);
3501+
}
3502+
3503+
@Test
3504+
public void usingFactoryAndDisposerThrow() {
3505+
Action1<Integer> onDispose = new Action1<Integer>() {
3506+
@Override
3507+
public void call(Integer t) {
3508+
throw new TestException();
3509+
}
3510+
};
3511+
3512+
TestSubscriber<Integer> ts = TestSubscriber.create();
3513+
3514+
Completable.using(new Func0<Integer>() {
3515+
@Override
3516+
public Integer call() {
3517+
return 1;
3518+
}
3519+
},
3520+
new Func1<Integer, Completable>() {
3521+
@Override
3522+
public Completable call(Integer t) {
3523+
throw new TestException();
3524+
}
3525+
}, onDispose).subscribe(ts);
3526+
3527+
ts.assertNoValues();
3528+
ts.assertNotCompleted();
3529+
ts.assertError(CompositeException.class);
3530+
3531+
CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0);
3532+
3533+
List<Throwable> listEx = ex.getExceptions();
3534+
3535+
assertEquals(2, listEx.size());
3536+
3537+
assertTrue(listEx.get(0).toString(), listEx.get(0) instanceof TestException);
3538+
assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException);
3539+
}
3540+
3541+
@Test
3542+
public void usingFactoryReturnsNull() {
3543+
@SuppressWarnings("unchecked")
3544+
Action1<Integer> onDispose = mock(Action1.class);
3545+
3546+
TestSubscriber<Integer> ts = TestSubscriber.create();
3547+
3548+
Completable.using(new Func0<Integer>() {
3549+
@Override
3550+
public Integer call() {
3551+
return 1;
3552+
}
3553+
},
3554+
new Func1<Integer, Completable>() {
3555+
@Override
3556+
public Completable call(Integer t) {
3557+
return null;
3558+
}
3559+
}, onDispose).subscribe(ts);
3560+
3561+
verify(onDispose).call(1);
3562+
3563+
ts.assertNoValues();
3564+
ts.assertNotCompleted();
3565+
ts.assertError(NullPointerException.class);
3566+
}
3567+
3568+
@Test
3569+
public void usingFactoryReturnsNullAndDisposerThrows() {
3570+
Action1<Integer> onDispose = new Action1<Integer>() {
3571+
@Override
3572+
public void call(Integer t) {
3573+
throw new TestException();
3574+
}
3575+
};
3576+
3577+
TestSubscriber<Integer> ts = TestSubscriber.create();
3578+
3579+
Completable.using(new Func0<Integer>() {
3580+
@Override
3581+
public Integer call() {
3582+
return 1;
3583+
}
3584+
},
3585+
new Func1<Integer, Completable>() {
3586+
@Override
3587+
public Completable call(Integer t) {
3588+
return null;
3589+
}
3590+
}, onDispose).subscribe(ts);
3591+
3592+
ts.assertNoValues();
3593+
ts.assertNotCompleted();
3594+
ts.assertError(CompositeException.class);
3595+
3596+
CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0);
3597+
3598+
List<Throwable> listEx = ex.getExceptions();
3599+
3600+
assertEquals(2, listEx.size());
3601+
3602+
assertTrue(listEx.get(0).toString(), listEx.get(0) instanceof NullPointerException);
3603+
assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException);
3604+
}
3605+
34723606
}

0 commit comments

Comments
 (0)