Skip to content

Commit 08a3975

Browse files
Merge pull request #1019 from benjchristensen/retry-1018
Fix: retry() never unsubscribes from source until operator completes
2 parents b0460d0 + 840c92b commit 08a3975

File tree

2 files changed

+107
-9
lines changed

2 files changed

+107
-9
lines changed

rxjava-core/src/main/java/rx/operators/OperatorRetry.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import rx.Subscriber;
4040
import rx.functions.Action1;
4141
import rx.schedulers.Schedulers;
42+
import rx.subscriptions.SerialSubscription;
4243

4344
public class OperatorRetry<T> implements Operator<T, Observable<T>> {
4445

@@ -55,18 +56,21 @@ public OperatorRetry() {
5556
}
5657

5758
@Override
58-
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> s) {
59-
return new Subscriber<Observable<T>>(s) {
59+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
60+
final SerialSubscription serialSubscription = new SerialSubscription();
61+
// add serialSubscription so it gets unsubscribed if child is unsubscribed
62+
child.add(serialSubscription);
63+
return new Subscriber<Observable<T>>(child) {
6064
final AtomicInteger attempts = new AtomicInteger(0);
61-
65+
6266
@Override
6367
public void onCompleted() {
6468
// ignore as we expect a single nested Observable<T>
6569
}
6670

6771
@Override
6872
public void onError(Throwable e) {
69-
s.onError(e);
73+
child.onError(e);
7074
}
7175

7276
@Override
@@ -77,11 +81,12 @@ public void onNext(final Observable<T> o) {
7781
public void call(final Inner inner) {
7882
final Action1<Inner> _self = this;
7983
attempts.incrementAndGet();
80-
o.unsafeSubscribe(new Subscriber<T>(s) {
84+
85+
Subscriber<T> subscriber = new Subscriber<T>(child) {
8186

8287
@Override
8388
public void onCompleted() {
84-
s.onCompleted();
89+
child.onCompleted();
8590
}
8691

8792
@Override
@@ -91,16 +96,19 @@ public void onError(Throwable e) {
9196
inner.schedule(_self);
9297
} else {
9398
// give up and pass the failure
94-
s.onError(e);
99+
child.onError(e);
95100
}
96101
}
97102

98103
@Override
99104
public void onNext(T v) {
100-
s.onNext(v);
105+
child.onNext(v);
101106
}
102107

103-
});
108+
};
109+
// register this Subscription (and unsubscribe previous if exists)
110+
serialSubscription.set(subscriber);
111+
o.unsafeSubscribe(subscriber);
104112
}
105113
});
106114
}

rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

+90
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
2224
import java.util.concurrent.atomic.AtomicInteger;
2325

2426
import org.junit.Test;
@@ -27,6 +29,7 @@
2729
import rx.Observable;
2830
import rx.Observable.OnSubscribeFunc;
2931
import rx.Observer;
32+
import rx.Subscriber;
3033
import rx.Subscription;
3134
import rx.functions.Action1;
3235
import rx.subjects.PublishSubject;
@@ -147,6 +150,93 @@ public void call(Integer n) {
147150
assertEquals(1, count.get());
148151
}
149152

153+
public static class SlowFuncAlwaysFails implements Observable.OnSubscribe<String> {
154+
155+
final AtomicInteger nextSeq=new AtomicInteger();
156+
final AtomicInteger activeSubs=new AtomicInteger();
157+
final AtomicInteger concurrentSubs=new AtomicInteger();
158+
159+
public void call(final Subscriber<? super String> s)
160+
{
161+
final int seq=nextSeq.incrementAndGet();
162+
163+
int cur=activeSubs.incrementAndGet();
164+
// Track concurrent subscriptions
165+
concurrentSubs.set(Math.max(cur,concurrentSubs.get()));
166+
167+
// Use async error
168+
new Thread(new Runnable() {
169+
@Override
170+
public void run() {
171+
try {
172+
Thread.sleep(100);
173+
} catch (InterruptedException e) {
174+
// ignore
175+
}
176+
s.onError(new RuntimeException("Subscriber #"+seq+" fails"));
177+
}
178+
}).start();
179+
180+
// Track unsubscribes
181+
s.add(new Subscription()
182+
{
183+
private boolean active=true;
184+
185+
public void unsubscribe()
186+
{
187+
if (active) {
188+
activeSubs.decrementAndGet();
189+
active=false;
190+
}
191+
}
192+
193+
public boolean isUnsubscribed()
194+
{
195+
return !active;
196+
}
197+
});
198+
}
199+
}
200+
201+
@Test
202+
public void testUnsubscribeAfterError() {
203+
204+
final CountDownLatch check=new CountDownLatch(1);
205+
final SlowFuncAlwaysFails sf=new SlowFuncAlwaysFails();
206+
207+
Observable
208+
.create(sf)
209+
.retry(4)
210+
.subscribe(
211+
new Action1<String>()
212+
{
213+
@Override
214+
public void call(String v)
215+
{
216+
fail("Should never happen");
217+
}
218+
},
219+
new Action1<Throwable>()
220+
{
221+
public void call(Throwable throwable)
222+
{
223+
check.countDown();
224+
}
225+
}
226+
);
227+
228+
try
229+
{
230+
check.await(1, TimeUnit.SECONDS);
231+
} catch (InterruptedException e)
232+
{
233+
fail("interrupted");
234+
}
235+
236+
assertEquals("5 Subscribers created", 5, sf.nextSeq.get());
237+
assertEquals("1 Active Subscriber", 1, sf.concurrentSubs.get());
238+
}
239+
150240
@Test
151241
public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed() throws InterruptedException {
152242
final AtomicInteger subsCount = new AtomicInteger(0);

0 commit comments

Comments
 (0)