Skip to content

Commit d1af6be

Browse files
committed
1.x: fix Completable.using not disposing the resource if the factory
crashes during the subscription phase. This PR fixes the cases when the Completable factory throws an exception or returns null and the resource is not disposed before reporting error to the subscriber.
1 parent 8d3a0c5 commit d1af6be

File tree

2 files changed

+157
-2
lines changed

2 files changed

+157
-2
lines changed

src/main/java/rx/Completable.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package rx;
1818

19-
import java.util.Iterator;
19+
import java.util.*;
2020
import java.util.concurrent.*;
2121
import java.util.concurrent.atomic.AtomicBoolean;
2222

2323
import rx.Observable.OnSubscribe;
2424
import rx.annotations.Experimental;
25-
import rx.exceptions.Exceptions;
25+
import rx.exceptions.*;
2626
import rx.functions.*;
2727
import rx.internal.operators.*;
2828
import rx.internal.util.*;
@@ -864,12 +864,33 @@ public void call(final CompletableSubscriber s) {
864864
try {
865865
cs = completableFunc1.call(resource);
866866
} catch (Throwable e) {
867+
try {
868+
disposer.call(resource);
869+
} catch (Throwable ex) {
870+
Exceptions.throwIfFatal(e);
871+
Exceptions.throwIfFatal(ex);
872+
873+
s.onSubscribe(Subscriptions.unsubscribed());
874+
s.onError(new CompositeException(Arrays.asList(e, ex)));
875+
return;
876+
}
877+
Exceptions.throwIfFatal(e);
878+
867879
s.onSubscribe(Subscriptions.unsubscribed());
868880
s.onError(e);
869881
return;
870882
}
871883

872884
if (cs == null) {
885+
try {
886+
disposer.call(resource);
887+
} catch (Throwable ex) {
888+
Exceptions.throwIfFatal(ex);
889+
890+
s.onSubscribe(Subscriptions.unsubscribed());
891+
s.onError(new CompositeException(Arrays.asList(new NullPointerException("The completable supplied is null"), ex)));
892+
return;
893+
}
873894
s.onSubscribe(Subscriptions.unsubscribed());
874895
s.onError(new NullPointerException("The completable supplied is null"));
875896
return;

src/test/java/rx/CompletableTest.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import rx.subjects.PublishSubject;
3232
import rx.subscriptions.*;
3333

34+
import static org.mockito.Mockito.*;
35+
import static org.junit.Assert.*;
36+
3437
/**
3538
* Test Completable methods and operators.
3639
*/
@@ -3410,4 +3413,135 @@ public void endWithFlowableError() {
34103413
ts.assertError(TestException.class);
34113414
ts.assertNotCompleted();
34123415
}
3416+
3417+
@Test
3418+
public void usingFactoryThrows() {
3419+
@SuppressWarnings("unchecked")
3420+
Action1<Integer> onDispose = mock(Action1.class);
3421+
3422+
TestSubscriber<Integer> ts = TestSubscriber.create();
3423+
3424+
Completable.using(new Func0<Integer>() {
3425+
@Override
3426+
public Integer call() {
3427+
return 1;
3428+
}
3429+
},
3430+
new Func1<Integer, Completable>() {
3431+
@Override
3432+
public Completable call(Integer t) {
3433+
throw new TestException();
3434+
}
3435+
}, onDispose).subscribe(ts);
3436+
3437+
verify(onDispose).call(1);
3438+
3439+
ts.assertNoValues();
3440+
ts.assertNotCompleted();
3441+
ts.assertError(TestException.class);
3442+
}
3443+
3444+
@Test
3445+
public void usingFactoryAndDisposerThrow() {
3446+
Action1<Integer> onDispose = new Action1<Integer>() {
3447+
@Override
3448+
public void call(Integer t) {
3449+
throw new TestException();
3450+
}
3451+
};
3452+
3453+
TestSubscriber<Integer> ts = TestSubscriber.create();
3454+
3455+
Completable.using(new Func0<Integer>() {
3456+
@Override
3457+
public Integer call() {
3458+
return 1;
3459+
}
3460+
},
3461+
new Func1<Integer, Completable>() {
3462+
@Override
3463+
public Completable call(Integer t) {
3464+
throw new TestException();
3465+
}
3466+
}, onDispose).subscribe(ts);
3467+
3468+
ts.assertNoValues();
3469+
ts.assertNotCompleted();
3470+
ts.assertError(CompositeException.class);
3471+
3472+
CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0);
3473+
3474+
List<Throwable> listEx = ex.getExceptions();
3475+
3476+
assertEquals(2, listEx.size());
3477+
3478+
assertTrue(listEx.get(0).toString(), listEx.get(0) instanceof TestException);
3479+
assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException);
3480+
}
3481+
3482+
@Test
3483+
public void usingFactoryReturnsNull() {
3484+
@SuppressWarnings("unchecked")
3485+
Action1<Integer> onDispose = mock(Action1.class);
3486+
3487+
TestSubscriber<Integer> ts = TestSubscriber.create();
3488+
3489+
Completable.using(new Func0<Integer>() {
3490+
@Override
3491+
public Integer call() {
3492+
return 1;
3493+
}
3494+
},
3495+
new Func1<Integer, Completable>() {
3496+
@Override
3497+
public Completable call(Integer t) {
3498+
return null;
3499+
}
3500+
}, onDispose).subscribe(ts);
3501+
3502+
verify(onDispose).call(1);
3503+
3504+
ts.assertNoValues();
3505+
ts.assertNotCompleted();
3506+
ts.assertError(NullPointerException.class);
3507+
}
3508+
3509+
@Test
3510+
public void usingFactoryReturnsNullAndDisposerThrows() {
3511+
Action1<Integer> onDispose = new Action1<Integer>() {
3512+
@Override
3513+
public void call(Integer t) {
3514+
throw new TestException();
3515+
}
3516+
};
3517+
3518+
TestSubscriber<Integer> ts = TestSubscriber.create();
3519+
3520+
Completable.using(new Func0<Integer>() {
3521+
@Override
3522+
public Integer call() {
3523+
return 1;
3524+
}
3525+
},
3526+
new Func1<Integer, Completable>() {
3527+
@Override
3528+
public Completable call(Integer t) {
3529+
return null;
3530+
}
3531+
}, onDispose).subscribe(ts);
3532+
3533+
ts.assertNoValues();
3534+
ts.assertNotCompleted();
3535+
ts.assertError(CompositeException.class);
3536+
3537+
CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0);
3538+
3539+
List<Throwable> listEx = ex.getExceptions();
3540+
3541+
assertEquals(2, listEx.size());
3542+
3543+
assertTrue(listEx.get(0).toString(), listEx.get(0) instanceof NullPointerException);
3544+
assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException);
3545+
}
3546+
34133547
}

0 commit comments

Comments
 (0)