Skip to content

Commit 2894ed0

Browse files
committed
Merge pull request #3433 from artem-zinnatullin/single-defer
Add Single.defer()
2 parents 4b29429 + ee91a9d commit 2894ed0

File tree

2 files changed

+173
-0
lines changed

2 files changed

+173
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1953,4 +1953,48 @@ public final Single<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
19531953
public final Single<T> delay(long delay, TimeUnit unit) {
19541954
return delay(delay, unit, Schedulers.computation());
19551955
}
1956+
1957+
/**
1958+
* Returns a {@link Single} that calls a {@link Single} factory to create a {@link Single} for each new Observer
1959+
* that subscribes. That is, for each subscriber, the actual {@link Single} that subscriber observes is
1960+
* determined by the factory function.
1961+
* <p>
1962+
* <img width="640" height="340" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defer.png" alt="">
1963+
* <p>
1964+
* The defer Observer allows you to defer or delay emitting value from a {@link Single} until such time as an
1965+
* Observer subscribes to the {@link Single}. This allows an {@link Observer} to easily obtain updates or a
1966+
* refreshed version of the sequence.
1967+
* <dl>
1968+
* <dt><b>Scheduler:</b></dt>
1969+
* <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd>
1970+
* </dl>
1971+
*
1972+
* @param singleFactory
1973+
* the {@link Single} factory function to invoke for each {@link Observer} that subscribes to the
1974+
* resulting {@link Single}.
1975+
* @param <T>
1976+
* the type of the items emitted by the {@link Single}.
1977+
* @return a {@link Single} whose {@link Observer}s' subscriptions trigger an invocation of the given
1978+
* {@link Single} factory function.
1979+
* @see <a href="http://reactivex.io/documentation/operators/defer.html">ReactiveX operators documentation: Defer</a>
1980+
*/
1981+
@Experimental
1982+
public static <T> Single<T> defer(final Callable<Single<T>> singleFactory) {
1983+
return create(new OnSubscribe<T>() {
1984+
@Override
1985+
public void call(SingleSubscriber<? super T> singleSubscriber) {
1986+
Single<? extends T> single;
1987+
1988+
try {
1989+
single = singleFactory.call();
1990+
} catch (Throwable t) {
1991+
Exceptions.throwIfFatal(t);
1992+
singleSubscriber.onError(t);
1993+
return;
1994+
}
1995+
1996+
single.subscribe(singleSubscriber);
1997+
}
1998+
});
1999+
}
19562000
}

src/test/java/rx/SingleTest.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.Matchers.eq;
2121
import static org.mockito.Mockito.doThrow;
2222
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
2324
import static org.mockito.Mockito.verify;
2425
import static org.mockito.Mockito.verifyZeroInteractions;
2526
import static org.mockito.Mockito.when;
@@ -30,10 +31,13 @@
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.TimeoutException;
3233
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
3335
import java.util.concurrent.atomic.AtomicReference;
3436

3537
import org.junit.Test;
3638

39+
import org.mockito.invocation.InvocationOnMock;
40+
import org.mockito.stubbing.Answer;
3741
import rx.Single.OnSubscribe;
3842
import rx.exceptions.CompositeException;
3943
import rx.functions.Action0;
@@ -699,4 +703,129 @@ public void call(SingleSubscriber<? super Integer> singleSubscriber) {
699703
subscriber.assertNoValues();
700704
subscriber.assertError(expected);
701705
}
706+
707+
@Test
708+
public void deferShouldNotCallFactoryFuncUntilSubscriberSubscribes() throws Exception {
709+
Callable<Single<Object>> singleFactory = mock(Callable.class);
710+
Single.defer(singleFactory);
711+
verifyZeroInteractions(singleFactory);
712+
}
713+
714+
@Test
715+
public void deferShouldSubscribeSubscriberToSingleFromFactoryFuncAndEmitValue() throws Exception {
716+
Callable<Single<Object>> singleFactory = mock(Callable.class);
717+
Object value = new Object();
718+
Single<Object> single = Single.just(value);
719+
720+
when(singleFactory.call()).thenReturn(single);
721+
722+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
723+
724+
Single
725+
.defer(singleFactory)
726+
.subscribe(testSubscriber);
727+
728+
testSubscriber.assertValue(value);
729+
testSubscriber.assertNoErrors();
730+
731+
verify(singleFactory).call();
732+
}
733+
734+
@Test
735+
public void deferShouldSubscribeSubscriberToSingleFromFactoryFuncAndEmitError() throws Exception {
736+
Callable<Single<Object>> singleFactory = mock(Callable.class);
737+
Throwable error = new IllegalStateException();
738+
Single<Object> single = Single.error(error);
739+
740+
when(singleFactory.call()).thenReturn(single);
741+
742+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
743+
744+
Single
745+
.defer(singleFactory)
746+
.subscribe(testSubscriber);
747+
748+
testSubscriber.assertNoValues();
749+
testSubscriber.assertError(error);
750+
751+
verify(singleFactory).call();
752+
}
753+
754+
@Test
755+
public void deferShouldPassErrorFromSingleFactoryToTheSubscriber() throws Exception {
756+
Callable<Single<Object>> singleFactory = mock(Callable.class);
757+
Throwable errorFromSingleFactory = new IllegalStateException();
758+
when(singleFactory.call()).thenThrow(errorFromSingleFactory);
759+
760+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
761+
762+
Single
763+
.defer(singleFactory)
764+
.subscribe(testSubscriber);
765+
766+
testSubscriber.assertNoValues();
767+
testSubscriber.assertError(errorFromSingleFactory);
768+
769+
verify(singleFactory).call();
770+
}
771+
772+
@Test
773+
public void deferShouldCallSingleFactoryForEachSubscriber() throws Exception {
774+
Callable<Single<String>> singleFactory = mock(Callable.class);
775+
776+
String[] values = {"1", "2", "3"};
777+
final Single[] singles = new Single[]{Single.just(values[0]), Single.just(values[1]), Single.just(values[2])};
778+
779+
final AtomicInteger singleFactoryCallsCounter = new AtomicInteger();
780+
781+
when(singleFactory.call()).thenAnswer(new Answer<Single<String>>() {
782+
@Override
783+
public Single<String> answer(InvocationOnMock invocation) throws Throwable {
784+
return singles[singleFactoryCallsCounter.getAndIncrement()];
785+
}
786+
});
787+
788+
Single<String> deferredSingle = Single.defer(singleFactory);
789+
790+
for (int i = 0; i < singles.length; i ++) {
791+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
792+
793+
deferredSingle.subscribe(testSubscriber);
794+
795+
testSubscriber.assertValue(values[i]);
796+
testSubscriber.assertNoErrors();
797+
}
798+
799+
verify(singleFactory, times(3)).call();
800+
}
801+
802+
@Test
803+
public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryIsNull() {
804+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
805+
806+
Single
807+
.defer(null)
808+
.subscribe(testSubscriber);
809+
810+
testSubscriber.assertNoValues();
811+
testSubscriber.assertError(NullPointerException.class);
812+
}
813+
814+
815+
@Test
816+
public void deferShouldPassNullPointerExceptionToTheSubscriberIfSingleFactoryReturnsNull() throws Exception {
817+
Callable<Single<Object>> singleFactory = mock(Callable.class);
818+
when(singleFactory.call()).thenReturn(null);
819+
820+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
821+
822+
Single
823+
.defer(singleFactory)
824+
.subscribe(testSubscriber);
825+
826+
testSubscriber.assertNoValues();
827+
testSubscriber.assertError(NullPointerException.class);
828+
829+
verify(singleFactory).call();
830+
}
702831
}

0 commit comments

Comments
 (0)