Skip to content

Commit 83ba4b9

Browse files
authored
2.x: cleanup, fixes and coverage 10/25 (#4766)
1 parent 318bf43 commit 83ba4b9

36 files changed

+1339
-434
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11329,6 +11329,8 @@ public final <R> Flowable<R> scan(final R initialValue, BiFunction<R, ? super T,
1132911329
* Publisher.defer(() -> o.scan(new ArrayList&lt;>(), (list, item) -> list.add(item)))
1133011330
* );
1133111331
* </code></pre>
11332+
* <p>
11333+
* Unlike 1.x, this operator doesn't emit the seed value unless the upstream signals an event.
1133211334
* <dl>
1133311335
* <dt><b>Backpressure:</b><dt>
1133411336
* <dd>The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well.

src/main/java/io/reactivex/internal/observers/ForEachWhileObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.Observer;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.exceptions.*;
2121
import io.reactivex.functions.*;
2222
import io.reactivex.internal.disposables.DisposableHelper;
2323
import io.reactivex.plugins.RxJavaPlugins;
@@ -82,7 +82,7 @@ public void onError(Throwable t) {
8282
onError.accept(t);
8383
} catch (Throwable ex) {
8484
Exceptions.throwIfFatal(ex);
85-
RxJavaPlugins.onError(ex);
85+
RxJavaPlugins.onError(new CompositeException(t, ex));
8686
}
8787
}
8888

src/main/java/io/reactivex/internal/observers/LambdaObserver.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
import io.reactivex.Observer;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.exceptions.*;
2121
import io.reactivex.functions.*;
22-
import io.reactivex.internal.disposables.*;
22+
import io.reactivex.internal.disposables.DisposableHelper;
2323
import io.reactivex.plugins.RxJavaPlugins;
2424

2525
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
@@ -47,42 +47,46 @@ public void onSubscribe(Disposable s) {
4747
onSubscribe.accept(this);
4848
} catch (Throwable ex) {
4949
Exceptions.throwIfFatal(ex);
50-
s.dispose();
51-
RxJavaPlugins.onError(ex);
50+
onError(ex);
5251
}
5352
}
5453
}
5554

5655
@Override
5756
public void onNext(T t) {
58-
try {
59-
onNext.accept(t);
60-
} catch (Throwable e) {
61-
Exceptions.throwIfFatal(e);
62-
onError(e);
57+
if (!isDisposed()) {
58+
try {
59+
onNext.accept(t);
60+
} catch (Throwable e) {
61+
Exceptions.throwIfFatal(e);
62+
onError(e);
63+
}
6364
}
6465
}
6566

6667
@Override
6768
public void onError(Throwable t) {
68-
dispose();
69-
try {
70-
onError.accept(t);
71-
} catch (Throwable e) {
72-
Exceptions.throwIfFatal(e);
73-
RxJavaPlugins.onError(e);
74-
RxJavaPlugins.onError(t);
69+
if (!isDisposed()) {
70+
dispose();
71+
try {
72+
onError.accept(t);
73+
} catch (Throwable e) {
74+
Exceptions.throwIfFatal(e);
75+
RxJavaPlugins.onError(new CompositeException(t, e));
76+
}
7577
}
7678
}
7779

7880
@Override
7981
public void onComplete() {
80-
dispose();
81-
try {
82-
onComplete.run();
83-
} catch (Throwable e) {
84-
Exceptions.throwIfFatal(e);
85-
RxJavaPlugins.onError(e);
82+
if (!isDisposed()) {
83+
dispose();
84+
try {
85+
onComplete.run();
86+
} catch (Throwable e) {
87+
Exceptions.throwIfFatal(e);
88+
RxJavaPlugins.onError(e);
89+
}
8690
}
8791
}
8892

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ static final class ConcatMapEagerDelayErrorSubscriber<T, R>
7272

7373
final ErrorMode errorMode;
7474

75-
final AtomicReference<Throwable> error;
75+
final AtomicThrowable errors;
7676

7777
final AtomicLong requested;
7878

@@ -95,7 +95,7 @@ static final class ConcatMapEagerDelayErrorSubscriber<T, R>
9595
this.prefetch = prefetch;
9696
this.errorMode = errorMode;
9797
this.subscribers = new SpscLinkedArrayQueue<InnerQueuedSubscriber<R>>(Math.min(prefetch, maxConcurrency));
98-
this.error = new AtomicReference<Throwable>();
98+
this.errors = new AtomicThrowable();
9999
this.requested = new AtomicLong();
100100
}
101101

@@ -146,7 +146,7 @@ public void onNext(T t) {
146146

147147
@Override
148148
public void onError(Throwable t) {
149-
if (ExceptionHelper.addThrowable(error, t)) {
149+
if (errors.addThrowable(t)) {
150150
done = true;
151151
drain();
152152
} else {
@@ -207,7 +207,7 @@ public void innerNext(InnerQueuedSubscriber<R> inner, R value) {
207207

208208
@Override
209209
public void innerError(InnerQueuedSubscriber<R> inner, Throwable e) {
210-
if (ExceptionHelper.addThrowable(this.error, e)) {
210+
if (errors.addThrowable(e)) {
211211
inner.setDone();
212212
if (errorMode != ErrorMode.END) {
213213
s.cancel();
@@ -242,11 +242,11 @@ public void drain() {
242242
if (inner == null) {
243243

244244
if (em != ErrorMode.END) {
245-
Throwable ex = error.get();
245+
Throwable ex = errors.get();
246246
if (ex != null) {
247247
cancelAll();
248248

249-
a.onError(ex);
249+
a.onError(errors.terminate());
250250
return;
251251
}
252252
}
@@ -256,7 +256,7 @@ public void drain() {
256256
inner = subscribers.poll();
257257

258258
if (outerDone && inner == null) {
259-
Throwable ex = error.get();
259+
Throwable ex = errors.terminate();
260260
if (ex != null) {
261261
a.onError(ex);
262262
} else {
@@ -282,13 +282,13 @@ public void drain() {
282282
}
283283

284284
if (em == ErrorMode.IMMEDIATE) {
285-
Throwable ex = error.get();
285+
Throwable ex = errors.get();
286286
if (ex != null) {
287287
current = null;
288288
inner.cancel();
289289
cancelAll();
290290

291-
a.onError(ex);
291+
a.onError(errors.terminate());
292292
return;
293293
}
294294
}
@@ -336,13 +336,13 @@ public void drain() {
336336
}
337337

338338
if (em == ErrorMode.IMMEDIATE) {
339-
Throwable ex = error.get();
339+
Throwable ex = errors.get();
340340
if (ex != null) {
341341
current = null;
342342
inner.cancel();
343343
cancelAll();
344344

345-
a.onError(ex);
345+
a.onError(errors.terminate());
346346
return;
347347
}
348348
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.exceptions.*;
2323
import io.reactivex.functions.Cancellable;
2424
import io.reactivex.internal.disposables.*;
25-
import io.reactivex.internal.fuseable.SimpleQueue;
25+
import io.reactivex.internal.fuseable.SimplePlainQueue;
2626
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2727
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2828
import io.reactivex.internal.util.*;
@@ -91,7 +91,7 @@ static final class SerializedEmitter<T>
9191

9292
final AtomicThrowable error;
9393

94-
final SimpleQueue<T> queue;
94+
final SimplePlainQueue<T> queue;
9595

9696
volatile boolean done;
9797

@@ -116,7 +116,7 @@ public void onNext(T t) {
116116
return;
117117
}
118118
} else {
119-
SimpleQueue<T> q = queue;
119+
SimplePlainQueue<T> q = queue;
120120
synchronized (q) {
121121
q.offer(t);
122122
}
@@ -161,7 +161,7 @@ void drain() {
161161

162162
void drainLoop() {
163163
BaseEmitter<T> e = emitter;
164-
SimpleQueue<T> q = queue;
164+
SimplePlainQueue<T> q = queue;
165165
AtomicThrowable error = this.error;
166166
int missed = 1;
167167
for (;;) {
@@ -179,15 +179,8 @@ void drainLoop() {
179179
}
180180

181181
boolean d = done;
182-
T v;
183-
184-
try {
185-
v = q.poll();
186-
} catch (Throwable ex) {
187-
Exceptions.throwIfFatal(ex);
188-
// should never happen
189-
v = null;
190-
}
182+
183+
T v = q.poll();
191184

192185
boolean empty = v == null;
193186

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,7 @@ void emit(long idx, T value) {
142142
long r = get();
143143
if (r != 0L) {
144144
actual.onNext(value);
145-
if (r != Long.MAX_VALUE) {
146-
decrementAndGet();
147-
}
145+
BackpressureHelper.produced(this, 1);
148146
} else {
149147
cancel();
150148
actual.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.Scheduler.Worker;
2323
import io.reactivex.disposables.Disposable;
2424
import io.reactivex.exceptions.MissingBackpressureException;
25-
import io.reactivex.internal.disposables.DisposableHelper;
25+
import io.reactivex.internal.disposables.*;
2626
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2727
import io.reactivex.internal.util.BackpressureHelper;
2828
import io.reactivex.plugins.RxJavaPlugins;
@@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
5858

5959
Subscription s;
6060

61-
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
61+
final SequentialDisposable timer = new SequentialDisposable();
6262

6363
volatile long index;
6464

@@ -94,13 +94,11 @@ public void onNext(T t) {
9494
}
9595

9696
DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
97-
if (!timer.compareAndSet(d, de)) {
98-
return;
99-
}
97+
if (timer.replace(de)) {
98+
d = worker.schedule(de, timeout, unit);
10099

101-
d = worker.schedule(de, timeout, unit);
102-
103-
de.setResource(d);
100+
de.setResource(d);
101+
}
104102
}
105103

106104
@Override
@@ -153,9 +151,7 @@ void emit(long idx, T t, DebounceEmitter<T> emitter) {
153151
long r = get();
154152
if (r != 0L) {
155153
actual.onNext(t);
156-
if (r != Long.MAX_VALUE) {
157-
decrementAndGet();
158-
}
154+
BackpressureHelper.produced(this, 1);
159155

160156
emitter.dispose();
161157
} else {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import java.util.concurrent.atomic.AtomicReference;
16+
import java.util.concurrent.atomic.*;
1717

1818
import org.reactivestreams.*;
1919

@@ -24,7 +24,6 @@
2424
import io.reactivex.internal.disposables.DisposableHelper;
2525
import io.reactivex.internal.functions.ObjectHelper;
2626
import io.reactivex.internal.fuseable.FuseToFlowable;
27-
import io.reactivex.internal.observers.BasicIntQueueDisposable;
2827
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2928
import io.reactivex.internal.util.AtomicThrowable;
3029
import io.reactivex.plugins.RxJavaPlugins;
@@ -62,8 +61,8 @@ public Flowable<T> fuseToFlowable() {
6261
return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletable<T>(source, mapper, delayErrors, maxConcurrency));
6362
}
6463

65-
static final class FlatMapCompletableMainSubscriber<T> extends BasicIntQueueDisposable<T>
66-
implements Subscriber<T> {
64+
static final class FlatMapCompletableMainSubscriber<T> extends AtomicInteger
65+
implements Subscriber<T>, Disposable {
6766
private static final long serialVersionUID = 8443155186132538303L;
6867

6968
final CompletableObserver actual;
@@ -183,26 +182,6 @@ public boolean isDisposed() {
183182
return set.isDisposed();
184183
}
185184

186-
@Override
187-
public T poll() throws Exception {
188-
return null; // always empty
189-
}
190-
191-
@Override
192-
public boolean isEmpty() {
193-
return true; // always empty
194-
}
195-
196-
@Override
197-
public void clear() {
198-
// nothing to clear
199-
}
200-
201-
@Override
202-
public int requestFusion(int mode) {
203-
return mode & ASYNC;
204-
}
205-
206185
void innerComplete(InnerObserver inner) {
207186
set.delete(inner);
208187
onComplete();

0 commit comments

Comments
 (0)