Skip to content

Commit ad8ca8e

Browse files
PhilGlassakarnokd
authored andcommitted
2.x: Add Single.delay overload that delays errors (#5616)
1 parent 15fea59 commit ad8ca8e

File tree

4 files changed

+101
-27
lines changed

4 files changed

+101
-27
lines changed

src/main/java/io/reactivex/Single.java

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1617,32 +1617,53 @@ public final Flowable<T> concatWith(SingleSource<? extends T> other) {
16171617
}
16181618

16191619
/**
1620-
* Delays the emission of the success or error signal from the current Single by
1621-
* the specified amount.
1620+
* Delays the emission of the success signal from the current Single by the specified amount.
1621+
* An error signal will not be delayed.
16221622
* <dl>
16231623
* <dt><b>Scheduler:</b></dt>
16241624
* <dd>{@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
16251625
* </dl>
16261626
*
1627-
* @param time the time amount to delay the signals
1627+
* @param time the amount of time the success signal should be delayed for
16281628
* @param unit the time unit
16291629
* @return the new Single instance
16301630
* @since 2.0
16311631
*/
16321632
@CheckReturnValue
16331633
@SchedulerSupport(SchedulerSupport.COMPUTATION)
16341634
public final Single<T> delay(long time, TimeUnit unit) {
1635-
return delay(time, unit, Schedulers.computation());
1635+
return delay(time, unit, Schedulers.computation(), false);
1636+
}
1637+
1638+
/**
1639+
* Delays the emission of the success or error signal from the current Single by the specified amount.
1640+
* <dl>
1641+
* <dt><b>Scheduler:</b></dt>
1642+
* <dd>{@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
1643+
* </dl>
1644+
*
1645+
* @param time the amount of time the success or error signal should be delayed for
1646+
* @param unit the time unit
1647+
* @param delayError if true, both success and error signals are delayed. if false, only success signals are delayed.
1648+
* @return the new Single instance
1649+
* @since 2.1.5 - experimental
1650+
*/
1651+
@Experimental
1652+
@CheckReturnValue
1653+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
1654+
public final Single<T> delay(long time, TimeUnit unit, boolean delayError) {
1655+
return delay(time, unit, Schedulers.computation(), delayError);
16361656
}
16371657

16381658
/**
16391659
* Delays the emission of the success signal from the current Single by the specified amount.
1660+
* An error signal will not be delayed.
16401661
* <dl>
16411662
* <dt><b>Scheduler:</b></dt>
16421663
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
16431664
* </dl>
16441665
*
1645-
* @param time the time amount to delay the emission of the success signal
1666+
* @param time the amount of time the success signal should be delayed for
16461667
* @param unit the time unit
16471668
* @param scheduler the target scheduler to use for the non-blocking wait and emission
16481669
* @return the new Single instance
@@ -1654,9 +1675,33 @@ public final Single<T> delay(long time, TimeUnit unit) {
16541675
@CheckReturnValue
16551676
@SchedulerSupport(SchedulerSupport.CUSTOM)
16561677
public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler) {
1678+
return delay(time, unit, scheduler, false);
1679+
}
1680+
1681+
/**
1682+
* Delays the emission of the success or error signal from the current Single by the specified amount.
1683+
* <dl>
1684+
* <dt><b>Scheduler:</b></dt>
1685+
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
1686+
* </dl>
1687+
*
1688+
* @param time the amount of time the success or error signal should be delayed for
1689+
* @param unit the time unit
1690+
* @param scheduler the target scheduler to use for the non-blocking wait and emission
1691+
* @param delayError if true, both success and error signals are delayed. if false, only success signals are delayed.
1692+
* @return the new Single instance
1693+
* @throws NullPointerException
1694+
* if unit is null, or
1695+
* if scheduler is null
1696+
* @since 2.1.5 - experimental
1697+
*/
1698+
@Experimental
1699+
@CheckReturnValue
1700+
@SchedulerSupport(SchedulerSupport.CUSTOM)
1701+
public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler, boolean delayError) {
16571702
ObjectHelper.requireNonNull(unit, "unit is null");
16581703
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
1659-
return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler));
1704+
return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler, delayError));
16601705
}
16611706

16621707
/**

src/main/java/io/reactivex/internal/operators/single/SingleDelay.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,18 @@
2121

2222
public final class SingleDelay<T> extends Single<T> {
2323

24-
2524
final SingleSource<? extends T> source;
2625
final long time;
2726
final TimeUnit unit;
2827
final Scheduler scheduler;
28+
final boolean delayError;
2929

30-
public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
30+
public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
3131
this.source = source;
3232
this.time = time;
3333
this.unit = unit;
3434
this.scheduler = scheduler;
35+
this.delayError = delayError;
3536
}
3637

3738
@Override
@@ -63,7 +64,7 @@ public void onSuccess(final T value) {
6364

6465
@Override
6566
public void onError(final Throwable e) {
66-
sd.replace(scheduler.scheduleDirect(new OnError(e), 0, unit));
67+
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
6768
}
6869

6970
final class OnSuccess implements Runnable {

src/test/java/io/reactivex/ParamValidationCheckerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,9 @@ public void checkParallelFlowable() {
312312

313313
// negative time is considered as zero time
314314
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class));
315+
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Boolean.TYPE));
315316
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class));
317+
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE));
316318

317319

318320
// zero repeat is allowed

src/test/java/io/reactivex/internal/operators/single/SingleDelayTest.java

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,38 +27,64 @@
2727
import io.reactivex.exceptions.TestException;
2828
import io.reactivex.functions.*;
2929
import io.reactivex.internal.subscriptions.BooleanSubscription;
30+
import io.reactivex.observers.TestObserver;
3031
import io.reactivex.plugins.RxJavaPlugins;
3132
import io.reactivex.schedulers.Schedulers;
33+
import io.reactivex.schedulers.TestScheduler;
3234
import io.reactivex.subjects.PublishSubject;
3335

3436
public class SingleDelayTest {
3537
@Test
36-
public void delay() throws Exception {
37-
final AtomicInteger value = new AtomicInteger();
38+
public void delayOnSuccess() {
39+
final TestScheduler scheduler = new TestScheduler();
40+
final TestObserver<Integer> observer = Single.just(1)
41+
.delay(5, TimeUnit.SECONDS, scheduler)
42+
.test();
3843

39-
Single.just(1).delay(200, TimeUnit.MILLISECONDS)
40-
.subscribe(new BiConsumer<Integer, Throwable>() {
41-
@Override
42-
public void accept(Integer v, Throwable e) throws Exception {
43-
value.set(v);
44-
}
45-
});
44+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
45+
observer.assertNoValues();
4646

47-
Thread.sleep(100);
47+
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
48+
observer.assertValue(1);
49+
}
50+
51+
@Test
52+
public void delayOnError() {
53+
final TestScheduler scheduler = new TestScheduler();
54+
final TestObserver<?> observer = Single.error(new TestException())
55+
.delay(5, TimeUnit.SECONDS, scheduler)
56+
.test();
57+
58+
scheduler.triggerActions();
59+
observer.assertError(TestException.class);
60+
}
4861

49-
assertEquals(0, value.get());
62+
@Test
63+
public void delayedErrorOnSuccess() {
64+
final TestScheduler scheduler = new TestScheduler();
65+
final TestObserver<Integer> observer = Single.just(1)
66+
.delay(5, TimeUnit.SECONDS, scheduler, true)
67+
.test();
5068

51-
Thread.sleep(200);
69+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
70+
observer.assertNoValues();
5271

53-
assertEquals(1, value.get());
72+
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
73+
observer.assertValue(1);
5474
}
5575

5676
@Test
57-
public void delayError() {
58-
Single.error(new TestException()).delay(5, TimeUnit.SECONDS)
59-
.test()
60-
.awaitDone(1, TimeUnit.SECONDS)
61-
.assertFailure(TestException.class);
77+
public void delayedErrorOnError() {
78+
final TestScheduler scheduler = new TestScheduler();
79+
final TestObserver<?> observer = Single.error(new TestException())
80+
.delay(5, TimeUnit.SECONDS, scheduler, true)
81+
.test();
82+
83+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
84+
observer.assertNoErrors();
85+
86+
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
87+
observer.assertError(TestException.class);
6288
}
6389

6490
@Test

0 commit comments

Comments
 (0)