@@ -5801,8 +5801,19 @@ public final Observable<T> repeat(final long count, Scheduler scheduler) {
5801
5801
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
5802
5802
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
5803
5803
*/
5804
- public final Observable <T > repeatWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <?>> notificationHandler , Scheduler scheduler ) {
5805
- return OnSubscribeRedo .repeat (this , notificationHandler , scheduler );
5804
+ public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
5805
+ Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
5806
+ @Override
5807
+ public Observable<?> call(Observable<? extends Notification<?>> notifications) {
5808
+ return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
5809
+ @Override
5810
+ public Void call(Notification<?> notification) {
5811
+ return null;
5812
+ }
5813
+ }));
5814
+ }
5815
+ };
5816
+ return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler, scheduler);
5806
5817
}
5807
5818
5808
5819
/**
@@ -5825,8 +5836,19 @@ public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notific
5825
5836
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
5826
5837
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
5827
5838
*/
5828
- public final Observable <T > repeatWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <?>> notificationHandler ) {
5829
- return OnSubscribeRedo .repeat (this , notificationHandler );
5839
+ public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
5840
+ Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
5841
+ @Override
5842
+ public Observable<?> call(Observable<? extends Notification<?>> notifications) {
5843
+ return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
5844
+ @Override
5845
+ public Void call(Notification<?> notification) {
5846
+ return null;
5847
+ }
5848
+ }));
5849
+ }
5850
+ };
5851
+ return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler);
5830
5852
}
5831
5853
5832
5854
/**
@@ -6541,8 +6563,19 @@ public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
6541
6563
* @return the source Observable modified with retry logic
6542
6564
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
6543
6565
*/
6544
- public final Observable <T > retryWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <?>> notificationHandler ) {
6545
- return OnSubscribeRedo .<T > retry (this , notificationHandler );
6566
+ public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
6567
+ Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
6568
+ @Override
6569
+ public Observable<?> call(Observable<? extends Notification<?>> notifications) {
6570
+ return notificationHandler.call(notifications.map(new Func1<Notification<?>, Throwable>() {
6571
+ @Override
6572
+ public Throwable call(Notification<?> notification) {
6573
+ return notification.getThrowable();
6574
+ }
6575
+ }));
6576
+ }
6577
+ };
6578
+ return OnSubscribeRedo.<T> retry(this, dematerializedNotificationHandler);
6546
6579
}
6547
6580
6548
6581
/**
@@ -6566,8 +6599,19 @@ public final Observable<T> retryWhen(Func1<? super Observable<? extends Notifica
6566
6599
* @return the source Observable modified with retry logic
6567
6600
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
6568
6601
*/
6569
- public final Observable <T > retryWhen (Func1 <? super Observable <? extends Notification <?>>, ? extends Observable <?>> notificationHandler , Scheduler scheduler ) {
6570
- return OnSubscribeRedo .<T > retry (this , notificationHandler , scheduler );
6602
+ public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
6603
+ Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
6604
+ @Override
6605
+ public Observable<?> call(Observable<? extends Notification<?>> notifications) {
6606
+ return notificationHandler.call(notifications.map(new Func1<Notification<?>, Throwable>() {
6607
+ @Override
6608
+ public Throwable call(Notification<?> notification) {
6609
+ return notification.getThrowable();
6610
+ }
6611
+ }));
6612
+ }
6613
+ };
6614
+ return OnSubscribeRedo.<T> retry(this, dematerializedNotificationHandler, scheduler);
6571
6615
}
6572
6616
6573
6617
/**
0 commit comments