Skip to content

Commit 0660284

Browse files
authored
1.x: fix timeout (timed, selector) unsubscribe bug (#5660)
* 1.x: fix timeout(time, [fallback]) unsubscribe bug * Fix timeout(selector) variant.
1 parent 81542cd commit 0660284

8 files changed

+726
-385
lines changed

src/main/java/rx/Observable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11284,11 +11284,13 @@ public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTi
1128411284
* if {@code timeoutSelector} is null
1128511285
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1128611286
*/
11287+
@SuppressWarnings("unchecked")
1128711288
public final <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<V>> timeoutSelector, Observable<? extends T> other) {
1128811289
if (timeoutSelector == null) {
1128911290
throw new NullPointerException("timeoutSelector is null");
1129011291
}
11291-
return lift(new OperatorTimeoutWithSelector<T, U, V>(firstTimeoutSelector, timeoutSelector, other));
11292+
return unsafeCreate(new OnSubscribeTimeoutSelectorWithFallback<T, U, V>(this,
11293+
firstTimeoutSelector != null ? defer((Func0<Observable<U>>)firstTimeoutSelector) : null, timeoutSelector, other));
1129211294
}
1129311295

1129411296
/**
@@ -11443,7 +11445,7 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<?
1144311445
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1144411446
*/
1144511447
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
11446-
return lift(new OperatorTimeout<T>(timeout, timeUnit, other, scheduler));
11448+
return unsafeCreate(new OnSubscribeTimeoutTimedWithFallback<T>(this, timeout, timeUnit, scheduler, other));
1144711449
}
1144811450

1144911451
/**
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.TimeoutException;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
import rx.*;
23+
import rx.exceptions.Exceptions;
24+
import rx.functions.Func1;
25+
import rx.internal.operators.OnSubscribeTimeoutTimedWithFallback.FallbackSubscriber;
26+
import rx.internal.producers.ProducerArbiter;
27+
import rx.internal.subscriptions.SequentialSubscription;
28+
import rx.plugins.RxJavaHooks;
29+
30+
/**
31+
* Switches to the fallback Observable if: the first upstream item doesn't arrive before
32+
* the first timeout Observable signals an item or completes; or the Observable generated from
33+
* the previous upstream item signals its item or completes before the upstream signals the next item
34+
* of its own.
35+
*
36+
* @param <T> the input and output value type
37+
* @param <U> the value type of the first timeout Observable
38+
* @param <V> the value type of the item-based timeout Observable
39+
*
40+
* @since 1.3.3
41+
*/
42+
public final class OnSubscribeTimeoutSelectorWithFallback<T, U, V> implements Observable.OnSubscribe<T> {
43+
44+
final Observable<T> source;
45+
46+
final Observable<U> firstTimeoutIndicator;
47+
48+
final Func1<? super T, ? extends Observable<V>> itemTimeoutIndicator;
49+
50+
final Observable<? extends T> fallback;
51+
52+
public OnSubscribeTimeoutSelectorWithFallback(Observable<T> source,
53+
Observable<U> firstTimeoutIndicator,
54+
Func1<? super T, ? extends Observable<V>> itemTimeoutIndicator,
55+
Observable<? extends T> fallback) {
56+
this.source = source;
57+
this.firstTimeoutIndicator = firstTimeoutIndicator;
58+
this.itemTimeoutIndicator = itemTimeoutIndicator;
59+
this.fallback = fallback;
60+
}
61+
62+
@Override
63+
public void call(Subscriber<? super T> t) {
64+
TimeoutMainSubscriber<T> parent = new TimeoutMainSubscriber<T>(t, itemTimeoutIndicator, fallback);
65+
t.add(parent.upstream);
66+
t.setProducer(parent.arbiter);
67+
parent.startFirst(firstTimeoutIndicator);
68+
source.subscribe(parent);
69+
}
70+
71+
static final class TimeoutMainSubscriber<T> extends Subscriber<T> {
72+
73+
final Subscriber<? super T> actual;
74+
75+
final Func1<? super T, ? extends Observable<?>> itemTimeoutIndicator;
76+
77+
final Observable<? extends T> fallback;
78+
79+
final ProducerArbiter arbiter;
80+
81+
final AtomicLong index;
82+
83+
final SequentialSubscription task;
84+
85+
final SequentialSubscription upstream;
86+
87+
long consumed;
88+
89+
TimeoutMainSubscriber(Subscriber<? super T> actual,
90+
Func1<? super T, ? extends Observable<?>> itemTimeoutIndicator,
91+
Observable<? extends T> fallback) {
92+
this.actual = actual;
93+
this.itemTimeoutIndicator = itemTimeoutIndicator;
94+
this.fallback = fallback;
95+
this.arbiter = new ProducerArbiter();
96+
this.index = new AtomicLong();
97+
this.task = new SequentialSubscription();
98+
this.upstream = new SequentialSubscription(this);
99+
this.add(task);
100+
}
101+
102+
103+
@Override
104+
public void onNext(T t) {
105+
long idx = index.get();
106+
if (idx == Long.MAX_VALUE || !index.compareAndSet(idx, idx + 1)) {
107+
return;
108+
}
109+
110+
Subscription s = task.get();
111+
if (s != null) {
112+
s.unsubscribe();
113+
}
114+
115+
actual.onNext(t);
116+
117+
consumed++;
118+
119+
Observable<?> timeoutObservable;
120+
121+
try {
122+
timeoutObservable = itemTimeoutIndicator.call(t);
123+
if (timeoutObservable == null) {
124+
throw new NullPointerException("The itemTimeoutIndicator returned a null Observable");
125+
}
126+
} catch (Throwable ex) {
127+
Exceptions.throwIfFatal(ex);
128+
unsubscribe();
129+
index.getAndSet(Long.MAX_VALUE);
130+
actual.onError(ex);
131+
return;
132+
}
133+
134+
TimeoutConsumer tc = new TimeoutConsumer(idx + 1);
135+
if (task.replace(tc)) {
136+
timeoutObservable.subscribe(tc);
137+
}
138+
139+
}
140+
141+
void startFirst(Observable<?> firstTimeoutIndicator) {
142+
if (firstTimeoutIndicator != null) {
143+
TimeoutConsumer tc = new TimeoutConsumer(0L);
144+
if (task.replace(tc)) {
145+
firstTimeoutIndicator.subscribe(tc);
146+
}
147+
}
148+
}
149+
150+
@Override
151+
public void onError(Throwable e) {
152+
if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
153+
task.unsubscribe();
154+
155+
actual.onError(e);
156+
} else {
157+
RxJavaHooks.onError(e);
158+
}
159+
}
160+
161+
@Override
162+
public void onCompleted() {
163+
if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
164+
task.unsubscribe();
165+
166+
actual.onCompleted();
167+
}
168+
}
169+
170+
@Override
171+
public void setProducer(Producer p) {
172+
arbiter.setProducer(p);
173+
}
174+
175+
void onTimeout(long idx) {
176+
if (!index.compareAndSet(idx, Long.MAX_VALUE)) {
177+
return;
178+
}
179+
180+
unsubscribe();
181+
182+
if (fallback == null) {
183+
actual.onError(new TimeoutException());
184+
} else {
185+
long c = consumed;
186+
if (c != 0L) {
187+
arbiter.produced(c);
188+
}
189+
190+
FallbackSubscriber<T> fallbackSubscriber = new FallbackSubscriber<T>(actual, arbiter);
191+
192+
if (upstream.replace(fallbackSubscriber)) {
193+
fallback.subscribe(fallbackSubscriber);
194+
}
195+
}
196+
}
197+
198+
void onTimeoutError(long idx, Throwable ex) {
199+
if (index.compareAndSet(idx, Long.MAX_VALUE)) {
200+
unsubscribe();
201+
202+
actual.onError(ex);
203+
} else {
204+
RxJavaHooks.onError(ex);
205+
}
206+
207+
}
208+
209+
final class TimeoutConsumer extends Subscriber<Object> {
210+
211+
final long idx;
212+
213+
boolean done;
214+
215+
TimeoutConsumer(long idx) {
216+
this.idx = idx;
217+
}
218+
219+
@Override
220+
public void onNext(Object t) {
221+
if (!done) {
222+
done = true;
223+
unsubscribe();
224+
onTimeout(idx);
225+
}
226+
}
227+
228+
@Override
229+
public void onError(Throwable e) {
230+
if (!done) {
231+
done = true;
232+
onTimeoutError(idx, e);
233+
} else {
234+
RxJavaHooks.onError(e);
235+
}
236+
}
237+
238+
@Override
239+
public void onCompleted() {
240+
if (!done) {
241+
done = true;
242+
onTimeout(idx);
243+
}
244+
}
245+
}
246+
}
247+
}

0 commit comments

Comments
 (0)