Skip to content

Commit b3ad075

Browse files
authored
3.x: Add missing coverage, fix unused/inconsistent ops (#6901)
* 3.x: Add missing coverage, fix unused/inconsistent ops * More coverage improvements and cleanup * Some more coverage * Observable coverage and cleanup * Improve Flowable internals and coverage * More Flowable operator coverage and fixes * Last set of coverage & cleanup for Flowable operators * Fix wrong use of j.u.Observable
1 parent 38eda0c commit b3ad075

File tree

225 files changed

+7358
-1093
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

225 files changed

+7358
-1093
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ private static Completable merge0(@NonNull Publisher<@NonNull ? extends Completa
981981
@SafeVarargs
982982
public static Completable mergeArrayDelayError(@NonNull CompletableSource... sources) {
983983
Objects.requireNonNull(sources, "sources is null");
984-
return RxJavaPlugins.onAssembly(new CompletableMergeDelayErrorArray(sources));
984+
return RxJavaPlugins.onAssembly(new CompletableMergeArrayDelayError(sources));
985985
}
986986

987987
/**

src/main/java/io/reactivex/rxjava3/internal/functions/Functions.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,8 @@ public static <T> Predicate<T> equalsWith(T value) {
281281
return new EqualsPredicate<>(value);
282282
}
283283

284-
enum HashSetCallable implements Supplier<Set<Object>>, Callable<Set<Object>> {
284+
enum HashSetSupplier implements Supplier<Set<Object>> {
285285
INSTANCE;
286-
@Override
287-
public Set<Object> call() {
288-
return new HashSet<>();
289-
}
290-
291286
@Override
292287
public Set<Object> get() {
293288
return new HashSet<>();
@@ -296,7 +291,7 @@ public Set<Object> get() {
296291

297292
@SuppressWarnings({ "rawtypes", "unchecked" })
298293
public static <T> Supplier<Set<T>> createHashSet() {
299-
return (Supplier)HashSetCallable.INSTANCE;
294+
return (Supplier)HashSetSupplier.INSTANCE;
300295
}
301296

302297
static final class NotificationOnNext<T> implements Consumer<T> {
@@ -742,12 +737,7 @@ public boolean test(Object o) {
742737
}
743738
}
744739

745-
static final class NullProvider implements Callable<Object>, Supplier<Object> {
746-
@Override
747-
public Object call() {
748-
return null;
749-
}
750-
740+
static final class NullProvider implements Supplier<Object> {
751741
@Override
752742
public Object get() {
753743
return null;

src/main/java/io/reactivex/rxjava3/internal/observers/FutureObserver.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,22 +128,15 @@ public void onNext(T t) {
128128
@Override
129129
public void onError(Throwable t) {
130130
if (error == null) {
131-
error = t;
132-
133-
for (;;) {
134-
Disposable a = upstream.get();
135-
if (a == this || a == DisposableHelper.DISPOSED) {
136-
RxJavaPlugins.onError(t);
137-
return;
138-
}
139-
if (upstream.compareAndSet(a, this)) {
140-
countDown();
141-
return;
142-
}
131+
Disposable a = upstream.get();
132+
if (a != this && a != DisposableHelper.DISPOSED
133+
&& upstream.compareAndSet(a, this)) {
134+
error = t;
135+
countDown();
136+
return;
143137
}
144-
} else {
145-
RxJavaPlugins.onError(t);
146138
}
139+
RxJavaPlugins.onError(t);
147140
}
148141

149142
@Override
@@ -152,15 +145,12 @@ public void onComplete() {
152145
onError(new NoSuchElementException("The source is empty"));
153146
return;
154147
}
155-
for (;;) {
156-
Disposable a = upstream.get();
157-
if (a == this || a == DisposableHelper.DISPOSED) {
158-
return;
159-
}
160-
if (upstream.compareAndSet(a, this)) {
161-
countDown();
162-
return;
163-
}
148+
Disposable a = upstream.get();
149+
if (a == this || a == DisposableHelper.DISPOSED) {
150+
return;
151+
}
152+
if (upstream.compareAndSet(a, this)) {
153+
countDown();
164154
}
165155
}
166156

src/main/java/io/reactivex/rxjava3/internal/observers/InnerQueuedObserver.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,4 @@ public void setDone() {
114114
public SimpleQueue<T> queue() {
115115
return queue;
116116
}
117-
118-
public int fusionMode() {
119-
return fusionMode;
120-
}
121117
}

src/main/java/io/reactivex/rxjava3/internal/observers/QueueDrainObserver.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ public final boolean enter() {
5757
return wip.getAndIncrement() == 0;
5858
}
5959

60-
public final boolean fastEnter() {
61-
return wip.get() == 0 && wip.compareAndSet(0, 1);
62-
}
63-
6460
protected final void fastPathEmit(U value, boolean delayError, Disposable dispose) {
6561
final Observer<? super V> observer = downstream;
6662
final SimplePlainQueue<U> q = queue;

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,8 @@ void drain() {
182182
boolean empty = cs == null;
183183

184184
if (d && empty) {
185-
if (once.compareAndSet(false, true)) {
186-
downstream.onComplete();
187-
}
185+
// errors never set done or call drain.
186+
downstream.onComplete();
188187
return;
189188
}
190189

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeArray.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,8 @@ public void onError(Throwable e) {
8686
@Override
8787
public void onComplete() {
8888
if (decrementAndGet() == 0) {
89-
if (once.compareAndSet(false, true)) {
90-
downstream.onComplete();
91-
}
89+
// errors don't decrement this
90+
downstream.onComplete();
9291
}
9392
}
9493

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import io.reactivex.rxjava3.disposables.*;
2020
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
2121

22-
public final class CompletableMergeDelayErrorArray extends Completable {
22+
public final class CompletableMergeArrayDelayError extends Completable {
2323

2424
final CompletableSource[] sources;
2525

26-
public CompletableMergeDelayErrorArray(CompletableSource[] sources) {
26+
public CompletableMergeArrayDelayError(CompletableSource[] sources) {
2727
this.sources = sources;
2828
}
2929

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeDelayErrorIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.rxjava3.core.*;
2121
import io.reactivex.rxjava3.disposables.CompositeDisposable;
2222
import io.reactivex.rxjava3.exceptions.Exceptions;
23-
import io.reactivex.rxjava3.internal.operators.completable.CompletableMergeDelayErrorArray.*;
23+
import io.reactivex.rxjava3.internal.operators.completable.CompletableMergeArrayDelayError.*;
2424
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
2525

2626
public final class CompletableMergeDelayErrorIterable extends Completable {

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeIterable.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,8 @@ public void onError(Throwable e) {
128128
@Override
129129
public void onComplete() {
130130
if (wip.decrementAndGet() == 0) {
131-
if (compareAndSet(false, true)) {
132-
downstream.onComplete();
133-
}
131+
// errors don't decrement wip
132+
downstream.onComplete();
134133
}
135134
}
136135

0 commit comments

Comments
 (0)