Skip to content

Commit e92822c

Browse files
authored
2.x: add missing null check to fused Observable.fromCallable (#5517)
1 parent e4fbe4c commit e92822c

File tree

3 files changed

+325
-1
lines changed

3 files changed

+325
-1
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,6 @@ public void subscribeActual(Observer<? super T> s) {
5454

5555
@Override
5656
public T call() throws Exception {
57-
return callable.call();
57+
return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
5858
}
5959
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableFromCallableTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.reactivex.internal.operators.flowable;
1818

1919
import static org.mockito.Mockito.*;
20+
import static org.junit.Assert.*;
2021

2122
import java.util.concurrent.*;
2223

@@ -26,6 +27,7 @@
2627
import org.reactivestreams.*;
2728

2829
import io.reactivex.*;
30+
import io.reactivex.functions.Function;
2931
import io.reactivex.schedulers.Schedulers;
3032
import io.reactivex.subscribers.TestSubscriber;
3133

@@ -156,4 +158,84 @@ public Object call() throws Exception {
156158
verify(observer).onError(checkedException);
157159
verifyNoMoreInteractions(observer);
158160
}
161+
162+
@Test
163+
public void fusedFlatMapExecution() {
164+
final int[] calls = { 0 };
165+
166+
Flowable.just(1).flatMap(new Function<Integer, Publisher<? extends Object>>() {
167+
@Override
168+
public Publisher<? extends Object> apply(Integer v)
169+
throws Exception {
170+
return Flowable.fromCallable(new Callable<Object>() {
171+
@Override
172+
public Object call() throws Exception {
173+
return ++calls[0];
174+
}
175+
});
176+
}
177+
})
178+
.test()
179+
.assertResult(1);
180+
181+
assertEquals(1, calls[0]);
182+
}
183+
184+
@Test
185+
public void fusedFlatMapExecutionHidden() {
186+
final int[] calls = { 0 };
187+
188+
Flowable.just(1).hide().flatMap(new Function<Integer, Publisher<? extends Object>>() {
189+
@Override
190+
public Publisher<? extends Object> apply(Integer v)
191+
throws Exception {
192+
return Flowable.fromCallable(new Callable<Object>() {
193+
@Override
194+
public Object call() throws Exception {
195+
return ++calls[0];
196+
}
197+
});
198+
}
199+
})
200+
.test()
201+
.assertResult(1);
202+
203+
assertEquals(1, calls[0]);
204+
}
205+
206+
@Test
207+
public void fusedFlatMapNull() {
208+
Flowable.just(1).flatMap(new Function<Integer, Publisher<? extends Object>>() {
209+
@Override
210+
public Publisher<? extends Object> apply(Integer v)
211+
throws Exception {
212+
return Flowable.fromCallable(new Callable<Object>() {
213+
@Override
214+
public Object call() throws Exception {
215+
return null;
216+
}
217+
});
218+
}
219+
})
220+
.test()
221+
.assertFailure(NullPointerException.class);
222+
}
223+
224+
@Test
225+
public void fusedFlatMapNullHidden() {
226+
Flowable.just(1).hide().flatMap(new Function<Integer, Publisher<? extends Object>>() {
227+
@Override
228+
public Publisher<? extends Object> apply(Integer v)
229+
throws Exception {
230+
return Flowable.fromCallable(new Callable<Object>() {
231+
@Override
232+
public Object call() throws Exception {
233+
return null;
234+
}
235+
});
236+
}
237+
})
238+
.test()
239+
.assertFailure(NullPointerException.class);
240+
}
159241
}
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.reactivex.internal.operators.observable;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.*;
22+
23+
import java.util.concurrent.*;
24+
25+
import org.junit.Test;
26+
import org.mockito.invocation.InvocationOnMock;
27+
import org.mockito.stubbing.Answer;
28+
29+
import io.reactivex.*;
30+
import io.reactivex.disposables.Disposable;
31+
import io.reactivex.functions.Function;
32+
import io.reactivex.observers.TestObserver;
33+
import io.reactivex.schedulers.Schedulers;
34+
35+
public class ObservableFromCallableTest {
36+
37+
@SuppressWarnings("unchecked")
38+
@Test
39+
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
40+
Callable<Object> func = mock(Callable.class);
41+
42+
when(func.call()).thenReturn(new Object());
43+
44+
Observable<Object> fromCallableObservable = Observable.fromCallable(func);
45+
46+
verifyZeroInteractions(func);
47+
48+
fromCallableObservable.subscribe();
49+
50+
verify(func).call();
51+
}
52+
53+
@SuppressWarnings("unchecked")
54+
@Test
55+
public void shouldCallOnNextAndOnCompleted() throws Exception {
56+
Callable<String> func = mock(Callable.class);
57+
58+
when(func.call()).thenReturn("test_value");
59+
60+
Observable<String> fromCallableObservable = Observable.fromCallable(func);
61+
62+
Observer<Object> observer = TestHelper.mockObserver();
63+
64+
fromCallableObservable.subscribe(observer);
65+
66+
verify(observer).onNext("test_value");
67+
verify(observer).onComplete();
68+
verify(observer, never()).onError(any(Throwable.class));
69+
}
70+
71+
@SuppressWarnings("unchecked")
72+
@Test
73+
public void shouldCallOnError() throws Exception {
74+
Callable<Object> func = mock(Callable.class);
75+
76+
Throwable throwable = new IllegalStateException("Test exception");
77+
when(func.call()).thenThrow(throwable);
78+
79+
Observable<Object> fromCallableObservable = Observable.fromCallable(func);
80+
81+
Observer<Object> observer = TestHelper.mockObserver();
82+
83+
fromCallableObservable.subscribe(observer);
84+
85+
verify(observer, never()).onNext(any());
86+
verify(observer, never()).onComplete();
87+
verify(observer).onError(throwable);
88+
}
89+
90+
@SuppressWarnings("unchecked")
91+
@Test
92+
public void shouldNotDeliverResultIfSubscriberUnsubscribedBeforeEmission() throws Exception {
93+
Callable<String> func = mock(Callable.class);
94+
95+
final CountDownLatch funcLatch = new CountDownLatch(1);
96+
final CountDownLatch observerLatch = new CountDownLatch(1);
97+
98+
when(func.call()).thenAnswer(new Answer<String>() {
99+
@Override
100+
public String answer(InvocationOnMock invocation) throws Throwable {
101+
observerLatch.countDown();
102+
103+
try {
104+
funcLatch.await();
105+
} catch (InterruptedException e) {
106+
// It's okay, unsubscription causes Thread interruption
107+
108+
// Restoring interruption status of the Thread
109+
Thread.currentThread().interrupt();
110+
}
111+
112+
return "should_not_be_delivered";
113+
}
114+
});
115+
116+
Observable<String> fromCallableObservable = Observable.fromCallable(func);
117+
118+
Observer<Object> observer = TestHelper.mockObserver();
119+
120+
TestObserver<String> outer = new TestObserver<String>(observer);
121+
122+
fromCallableObservable
123+
.subscribeOn(Schedulers.computation())
124+
.subscribe(outer);
125+
126+
// Wait until func will be invoked
127+
observerLatch.await();
128+
129+
// Unsubscribing before emission
130+
outer.cancel();
131+
132+
// Emitting result
133+
funcLatch.countDown();
134+
135+
// func must be invoked
136+
verify(func).call();
137+
138+
// Observer must not be notified at all
139+
verify(observer).onSubscribe(any(Disposable.class));
140+
verifyNoMoreInteractions(observer);
141+
}
142+
143+
@Test
144+
public void shouldAllowToThrowCheckedException() {
145+
final Exception checkedException = new Exception("test exception");
146+
147+
Observable<Object> fromCallableObservable = Observable.fromCallable(new Callable<Object>() {
148+
@Override
149+
public Object call() throws Exception {
150+
throw checkedException;
151+
}
152+
});
153+
154+
Observer<Object> observer = TestHelper.mockObserver();
155+
156+
fromCallableObservable.subscribe(observer);
157+
158+
verify(observer).onSubscribe(any(Disposable.class));
159+
verify(observer).onError(checkedException);
160+
verifyNoMoreInteractions(observer);
161+
}
162+
163+
@Test
164+
public void fusedFlatMapExecution() {
165+
final int[] calls = { 0 };
166+
167+
Observable.just(1).flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
168+
@Override
169+
public ObservableSource<? extends Object> apply(Integer v)
170+
throws Exception {
171+
return Observable.fromCallable(new Callable<Object>() {
172+
@Override
173+
public Object call() throws Exception {
174+
return ++calls[0];
175+
}
176+
});
177+
}
178+
})
179+
.test()
180+
.assertResult(1);
181+
182+
assertEquals(1, calls[0]);
183+
}
184+
185+
@Test
186+
public void fusedFlatMapExecutionHidden() {
187+
final int[] calls = { 0 };
188+
189+
Observable.just(1).hide().flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
190+
@Override
191+
public ObservableSource<? extends Object> apply(Integer v)
192+
throws Exception {
193+
return Observable.fromCallable(new Callable<Object>() {
194+
@Override
195+
public Object call() throws Exception {
196+
return ++calls[0];
197+
}
198+
});
199+
}
200+
})
201+
.test()
202+
.assertResult(1);
203+
204+
assertEquals(1, calls[0]);
205+
}
206+
207+
@Test
208+
public void fusedFlatMapNull() {
209+
Observable.just(1).flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
210+
@Override
211+
public ObservableSource<? extends Object> apply(Integer v)
212+
throws Exception {
213+
return Observable.fromCallable(new Callable<Object>() {
214+
@Override
215+
public Object call() throws Exception {
216+
return null;
217+
}
218+
});
219+
}
220+
})
221+
.test()
222+
.assertFailure(NullPointerException.class);
223+
}
224+
225+
@Test
226+
public void fusedFlatMapNullHidden() {
227+
Observable.just(1).hide().flatMap(new Function<Integer, ObservableSource<? extends Object>>() {
228+
@Override
229+
public ObservableSource<? extends Object> apply(Integer v)
230+
throws Exception {
231+
return Observable.fromCallable(new Callable<Object>() {
232+
@Override
233+
public Object call() throws Exception {
234+
return null;
235+
}
236+
});
237+
}
238+
})
239+
.test()
240+
.assertFailure(NullPointerException.class);
241+
}
242+
}

0 commit comments

Comments
 (0)