Skip to content

Commit bf1a4f3

Browse files
author
Alex Wenckus
committed
Fix for #1791 - don't retry (subscribe) to source if child has unsubscribed.
1 parent bcf2a39 commit bf1a4f3

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

+4
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ public void call(final Subscriber<? super T> child) {
206206
final Action0 subscribeToSource = new Action0() {
207207
@Override
208208
public void call() {
209+
if (child.isUnsubscribed()) {
210+
return;
211+
}
212+
209213
Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
210214
@Override
211215
public void onCompleted() {

src/test/java/rx/internal/operators/OperatorRetryTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,37 @@ public Observable<?> call(Observable<? extends Throwable> t1) {
244244
inOrder.verifyNoMoreInteractions();
245245
}
246246

247+
@Test
248+
public void testSingleSubscriptionOnFirst() throws Exception {
249+
final AtomicInteger inc = new AtomicInteger(0);
250+
Observable.OnSubscribe<Integer> onSubscribe = new OnSubscribe<Integer>() {
251+
@Override
252+
public void call(Subscriber<? super Integer> subscriber) {
253+
final int emit = inc.incrementAndGet();
254+
subscriber.onNext(emit);
255+
subscriber.onCompleted();
256+
}
257+
};
258+
259+
int first = Observable.create(onSubscribe)
260+
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
261+
@Override
262+
public Observable<?> call(Observable<? extends Throwable> attempt) {
263+
return attempt.zipWith(Observable.just(1), new Func2<Throwable, Integer, Void>() {
264+
@Override
265+
public Void call(Throwable o, Integer integer) {
266+
return null;
267+
}
268+
});
269+
}
270+
})
271+
.toBlocking()
272+
.first();
273+
274+
assertEquals("Observer did not receive the expected output", 1, first);
275+
assertEquals("Subscribe was not called once", 1, inc.get());
276+
}
277+
247278
@Test
248279
public void testOriginFails() {
249280
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)