Skip to content

Commit 1abaaf2

Browse files
Merge pull request #958 from akarnokd/OperatorSkipWhile
OperatorSkipWhile
2 parents e23d3e0 + ea73bd1 commit 1abaaf2

File tree

4 files changed

+106
-109
lines changed

4 files changed

+106
-109
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
import rx.operators.OperationSkip;
9292
import rx.operators.OperationSkipLast;
9393
import rx.operators.OperationSkipUntil;
94-
import rx.operators.OperationSkipWhile;
94+
import rx.operators.OperatorSkipWhile;
9595
import rx.operators.OperationSum;
9696
import rx.operators.OperationSwitch;
9797
import rx.operators.OperationSynchronize;
@@ -6427,7 +6427,7 @@ public final <U> Observable<T> skipUntil(Observable<U> other) {
64276427
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229685.aspx">MSDN: Observable.SkipWhile</a>
64286428
*/
64296429
public final Observable<T> skipWhile(Func1<? super T, Boolean> predicate) {
6430-
return create(OperationSkipWhile.skipWhile(this, predicate));
6430+
return lift(new OperatorSkipWhile<T>(OperatorSkipWhile.toPredicate2(predicate)));
64316431
}
64326432

64336433
/**
@@ -6445,7 +6445,7 @@ public final Observable<T> skipWhile(Func1<? super T, Boolean> predicate) {
64456445
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211631.aspx">MSDN: Observable.SkipWhile</a>
64466446
*/
64476447
public final Observable<T> skipWhileWithIndex(Func2<? super T, Integer, Boolean> predicate) {
6448-
return create(OperationSkipWhile.skipWhileWithIndex(this, predicate));
6448+
return lift(new OperatorSkipWhile<T>(predicate));
64496449
}
64506450

64516451
/**

rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java

Lines changed: 0 additions & 98 deletions
This file was deleted.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import rx.Observable;
22+
import rx.Observable.OnSubscribeFunc;
23+
import rx.Observable.Operator;
24+
import rx.Observer;
25+
import rx.Subscriber;
26+
import rx.Subscription;
27+
import rx.functions.Func1;
28+
import rx.functions.Func2;
29+
30+
/**
31+
* Skips any emitted source items as long as the specified condition holds true. Emits all further source items
32+
* as soon as the condition becomes false.
33+
*/
34+
public final class OperatorSkipWhile<T> implements Operator<T, T> {
35+
private final Func2<? super T, Integer, Boolean> predicate;
36+
37+
public OperatorSkipWhile(Func2<? super T, Integer, Boolean> predicate) {
38+
this.predicate = predicate;
39+
}
40+
@Override
41+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
42+
return new Subscriber<T>(child) {
43+
boolean skipping = true;
44+
int index;
45+
@Override
46+
public void onNext(T t) {
47+
if (!skipping) {
48+
child.onNext(t);
49+
} else {
50+
if (!predicate.call(t, index++)) {
51+
skipping = false;
52+
child.onNext(t);
53+
}
54+
}
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
child.onError(e);
60+
}
61+
62+
@Override
63+
public void onCompleted() {
64+
child.onCompleted();
65+
}
66+
};
67+
}
68+
/** Convert to Func2 type predicate. */
69+
public static <T> Func2<T, Integer, Boolean> toPredicate2(final Func1<? super T, Boolean> predicate) {
70+
return new Func2<T, Integer, Boolean>() {
71+
72+
@Override
73+
public Boolean call(T t1, Integer t2) {
74+
return predicate.call(t1);
75+
}
76+
};
77+
}
78+
}

rxjava-core/src/test/java/rx/operators/OperationSkipWhileTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
1918
import static org.mockito.Mockito.*;
20-
import static rx.operators.OperationSkipWhile.*;
2119

2220
import org.junit.Test;
2321
import org.mockito.InOrder;
@@ -51,7 +49,7 @@ public Boolean call(Integer value, Integer index) {
5149
@Test
5250
public void testSkipWithIndex() {
5351
Observable<Integer> src = Observable.from(1, 2, 3, 4, 5);
54-
Observable.create(skipWhileWithIndex(src, INDEX_LESS_THAN_THREE)).subscribe(w);
52+
src.skipWhileWithIndex(INDEX_LESS_THAN_THREE).subscribe(w);
5553

5654
InOrder inOrder = inOrder(w);
5755
inOrder.verify(w, times(1)).onNext(4);
@@ -63,7 +61,7 @@ public void testSkipWithIndex() {
6361
@Test
6462
public void testSkipEmpty() {
6563
Observable<Integer> src = Observable.empty();
66-
Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
64+
src.skipWhile(LESS_THAN_FIVE).subscribe(w);
6765
verify(w, never()).onNext(anyInt());
6866
verify(w, never()).onError(any(Throwable.class));
6967
verify(w, times(1)).onCompleted();
@@ -72,7 +70,7 @@ public void testSkipEmpty() {
7270
@Test
7371
public void testSkipEverything() {
7472
Observable<Integer> src = Observable.from(1, 2, 3, 4, 3, 2, 1);
75-
Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
73+
src.skipWhile(LESS_THAN_FIVE).subscribe(w);
7674
verify(w, never()).onNext(anyInt());
7775
verify(w, never()).onError(any(Throwable.class));
7876
verify(w, times(1)).onCompleted();
@@ -81,7 +79,7 @@ public void testSkipEverything() {
8179
@Test
8280
public void testSkipNothing() {
8381
Observable<Integer> src = Observable.from(5, 3, 1);
84-
Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
82+
src.skipWhile(LESS_THAN_FIVE).subscribe(w);
8583

8684
InOrder inOrder = inOrder(w);
8785
inOrder.verify(w, times(1)).onNext(5);
@@ -94,7 +92,7 @@ public void testSkipNothing() {
9492
@Test
9593
public void testSkipSome() {
9694
Observable<Integer> src = Observable.from(1, 2, 3, 4, 5, 3, 1, 5);
97-
Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
95+
src.skipWhile(LESS_THAN_FIVE).subscribe(w);
9896

9997
InOrder inOrder = inOrder(w);
10098
inOrder.verify(w, times(1)).onNext(5);
@@ -108,11 +106,30 @@ public void testSkipSome() {
108106
@Test
109107
public void testSkipError() {
110108
Observable<Integer> src = Observable.from(1, 2, 42, 5, 3, 1);
111-
Observable.create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
109+
src.skipWhile(LESS_THAN_FIVE).subscribe(w);
112110

113111
InOrder inOrder = inOrder(w);
114112
inOrder.verify(w, never()).onNext(anyInt());
115113
inOrder.verify(w, never()).onCompleted();
116114
inOrder.verify(w, times(1)).onError(any(RuntimeException.class));
117115
}
116+
117+
@Test
118+
public void testSkipManySubscribers() {
119+
Observable<Integer> src = Observable.range(1, 10).skipWhile(LESS_THAN_FIVE);
120+
int n = 5;
121+
for (int i = 0; i < n; i++) {
122+
@SuppressWarnings("unchecked")
123+
Observer<Object> o = mock(Observer.class);
124+
InOrder inOrder = inOrder(o);
125+
126+
src.subscribe(o);
127+
128+
for (int j = 5; j < 10; j++) {
129+
inOrder.verify(o).onNext(j);
130+
}
131+
inOrder.verify(o).onCompleted();
132+
verify(o, never()).onError(any(Throwable.class));
133+
}
134+
}
118135
}

0 commit comments

Comments
 (0)