Skip to content

Commit 147c12d

Browse files
Merge pull request #1793 from benjchristensen/1791_first_with_retryWhen
Take/Redo Unsubscribe
2 parents bcf2a39 + d217209 commit 147c12d

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

+4
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ public void call(final Subscriber<? super T> child) {
206206
final Action0 subscribeToSource = new Action0() {
207207
@Override
208208
public void call() {
209+
if (child.isUnsubscribed()) {
210+
return;
211+
}
212+
209213
Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
210214
@Override
211215
public void onCompleted() {

src/main/java/rx/internal/operators/OperatorTake.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,16 @@ public void onError(Throwable e) {
6161
@Override
6262
public void onNext(T i) {
6363
if (!isUnsubscribed()) {
64-
child.onNext(i);
6564
if (++count >= limit) {
6665
completed = true;
67-
child.onCompleted();
66+
// unsubscribe before emitting onNext so shutdown happens before possible effects
67+
// of onNext such as product.request(n) calls be sent upstream.
6868
unsubscribe();
6969
}
70+
child.onNext(i);
71+
if (completed) {
72+
child.onCompleted();
73+
}
7074
}
7175
}
7276

src/test/java/rx/internal/operators/OperatorRetryTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,37 @@ public Observable<?> call(Observable<? extends Throwable> t1) {
244244
inOrder.verifyNoMoreInteractions();
245245
}
246246

247+
@Test
248+
public void testSingleSubscriptionOnFirst() throws Exception {
249+
final AtomicInteger inc = new AtomicInteger(0);
250+
Observable.OnSubscribe<Integer> onSubscribe = new OnSubscribe<Integer>() {
251+
@Override
252+
public void call(Subscriber<? super Integer> subscriber) {
253+
final int emit = inc.incrementAndGet();
254+
subscriber.onNext(emit);
255+
subscriber.onCompleted();
256+
}
257+
};
258+
259+
int first = Observable.create(onSubscribe)
260+
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
261+
@Override
262+
public Observable<?> call(Observable<? extends Throwable> attempt) {
263+
return attempt.zipWith(Observable.just(1), new Func2<Throwable, Integer, Void>() {
264+
@Override
265+
public Void call(Throwable o, Integer integer) {
266+
return null;
267+
}
268+
});
269+
}
270+
})
271+
.toBlocking()
272+
.first();
273+
274+
assertEquals("Observer did not receive the expected output", 1, first);
275+
assertEquals("Subscribe was not called once", 1, inc.get());
276+
}
277+
247278
@Test
248279
public void testOriginFails() {
249280
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)