Skip to content

Commit 83a3332

Browse files
authored
2.x: coverage and fixes 9/03-2 (#4469)
1 parent f59ce00 commit 83a3332

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+4338
-411
lines changed

src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ void drain() {
124124
} else
125125
if (NotificationLite.isDisposable(v)) {
126126
Disposable next = NotificationLite.getDisposable(v);
127-
if (s != null) {
128-
s.dispose();
127+
s.dispose();
128+
if (!cancelled) {
129+
s = next;
130+
} else {
131+
next.dispose();
129132
}
130-
s = next;
131133
} else
132134
if (NotificationLite.isError(v)) {
133135
q.clear();

src/main/java/io/reactivex/internal/operators/observable/ObservableMap.java

Lines changed: 29 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
package io.reactivex.internal.operators.observable;
1616

1717
import io.reactivex.*;
18-
import io.reactivex.disposables.Disposable;
19-
import io.reactivex.exceptions.Exceptions;
2018
import io.reactivex.functions.Function;
21-
import io.reactivex.internal.disposables.DisposableHelper;
22-
import io.reactivex.plugins.RxJavaPlugins;
19+
import io.reactivex.internal.functions.ObjectHelper;
20+
import io.reactivex.internal.subscribers.observable.BasicFuseableObserver;
2321

2422
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
2523
final Function<? super T, ? extends U> function;
@@ -31,75 +29,49 @@ public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U
3129

3230
@Override
3331
public void subscribeActual(Observer<? super U> t) {
34-
source.subscribe(new MapperSubscriber<T, U>(t, function));
32+
source.subscribe(new MapObserver<T, U>(t, function));
3533
}
3634

37-
static final class MapperSubscriber<T, U> implements Observer<T>, Disposable {
38-
final Observer<? super U> actual;
39-
final Function<? super T, ? extends U> function;
40-
41-
Disposable subscription;
42-
43-
boolean done;
44-
45-
public MapperSubscriber(Observer<? super U> actual, Function<? super T, ? extends U> function) {
46-
this.actual = actual;
47-
this.function = function;
48-
}
49-
@Override
50-
public void onSubscribe(Disposable s) {
51-
if (DisposableHelper.validate(this.subscription, s)) {
52-
subscription = s;
53-
actual.onSubscribe(this);
54-
}
35+
36+
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
37+
final Function<? super T, ? extends U> mapper;
38+
39+
public MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
40+
super(actual);
41+
this.mapper = mapper;
5542
}
43+
5644
@Override
5745
public void onNext(T t) {
5846
if (done) {
5947
return;
6048
}
61-
U u;
62-
try {
63-
u = function.apply(t);
64-
} catch (Throwable e) {
65-
Exceptions.throwIfFatal(e);
66-
subscription.dispose();
67-
onError(e);
68-
return;
69-
}
70-
if (u == null) {
71-
subscription.dispose();
72-
onError(new NullPointerException("Value returned by the function is null"));
73-
return;
74-
}
75-
actual.onNext(u);
76-
}
77-
@Override
78-
public void onError(Throwable t) {
79-
if (done) {
80-
RxJavaPlugins.onError(t);
49+
50+
if (sourceMode != NONE) {
51+
actual.onNext(null);
8152
return;
8253
}
83-
done = true;
84-
actual.onError(t);
85-
}
86-
@Override
87-
public void onComplete() {
88-
if (done) {
54+
55+
U v;
56+
57+
try {
58+
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
59+
} catch (Throwable ex) {
60+
fail(ex);
8961
return;
9062
}
91-
done = true;
92-
actual.onComplete();
63+
actual.onNext(v);
9364
}
94-
65+
9566
@Override
96-
public boolean isDisposed() {
97-
return subscription.isDisposed();
67+
public int requestFusion(int mode) {
68+
return transitiveBoundaryFusion(mode);
9869
}
99-
70+
10071
@Override
101-
public void dispose() {
102-
subscription.dispose();
72+
public U poll() throws Exception {
73+
T t = qs.poll();
74+
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
10375
}
10476
}
10577
}

src/main/java/io/reactivex/internal/operators/observable/ObservableRange.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,18 @@ static final class RangeDisposable
4545

4646
long index;
4747

48+
boolean fused;
49+
4850
public RangeDisposable(Observer<? super Integer> actual, long start, long end) {
4951
this.actual = actual;
5052
this.index = start;
5153
this.end = end;
5254
}
5355

5456
void run() {
57+
if (fused) {
58+
return;
59+
}
5560
Observer<? super Integer> actual = this.actual;
5661
long e = end;
5762
for (long i = index; i != e && get() == 0; i++) {
@@ -107,7 +112,11 @@ public boolean isDisposed() {
107112

108113
@Override
109114
public int requestFusion(int mode) {
110-
return mode & SYNC;
115+
if ((mode & SYNC) != 0) {
116+
fused = true;
117+
return SYNC;
118+
}
119+
return NONE;
111120
}
112121
}
113122
}

src/main/java/io/reactivex/internal/subscribers/flowable/BlockingSingleSubscriber.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
import org.reactivestreams.*;
1818

19-
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2020
import io.reactivex.internal.util.ExceptionHelper;
2121

2222
public abstract class BlockingSingleSubscriber<T> extends CountDownLatch
23-
implements Subscriber<T>, Disposable {
23+
implements Subscriber<T> {
2424

2525
T value;
2626
Throwable error;
@@ -35,11 +35,14 @@ public BlockingSingleSubscriber() {
3535

3636
@Override
3737
public final void onSubscribe(Subscription s) {
38-
this.s = s;
39-
if (!cancelled) {
40-
s.request(Long.MAX_VALUE);
41-
if (cancelled) {
42-
s.cancel();
38+
if (SubscriptionHelper.validate(this.s, s)) {
39+
this.s = s;
40+
if (!cancelled) {
41+
s.request(Long.MAX_VALUE);
42+
if (cancelled) {
43+
this.s = SubscriptionHelper.CANCELLED;
44+
s.cancel();
45+
}
4346
}
4447
}
4548
}
@@ -49,20 +52,6 @@ public final void onComplete() {
4952
countDown();
5053
}
5154

52-
@Override
53-
public final void dispose() {
54-
cancelled = true;
55-
Subscription s = this.s;
56-
if (s != null) {
57-
s.cancel();
58-
}
59-
}
60-
61-
@Override
62-
public final boolean isDisposed() {
63-
return cancelled;
64-
}
65-
6655
/**
6756
* Block until the first value arrives and return it, otherwise
6857
* return null for an empty source and rethrow any exception.
@@ -73,7 +62,11 @@ public final T blockingGet() {
7362
try {
7463
await();
7564
} catch (InterruptedException ex) {
76-
dispose();
65+
Subscription s = this.s;
66+
this.s = SubscriptionHelper.CANCELLED;
67+
if (s != null) {
68+
s.cancel();
69+
}
7770
throw ExceptionHelper.wrapOrThrow(ex);
7871
}
7972
}

src/main/java/io/reactivex/internal/subscribers/flowable/BlockingSubscriber.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,9 @@ public BlockingSubscriber(Queue<Object> queue) {
3535

3636
@Override
3737
public void onSubscribe(Subscription s) {
38-
if (!compareAndSet(null, s)) {
39-
s.cancel();
40-
if (get() != SubscriptionHelper.CANCELLED) {
41-
onError(new IllegalStateException("Subscription already set"));
42-
}
43-
return;
38+
if (SubscriptionHelper.setOnce(this, s)) {
39+
queue.offer(NotificationLite.subscription(this));
4440
}
45-
queue.offer(NotificationLite.subscription(this));
4641
}
4742

4843
@Override

src/main/java/io/reactivex/internal/subscribers/flowable/FutureSubscriber.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -123,22 +123,17 @@ public void onNext(T t) {
123123

124124
@Override
125125
public void onError(Throwable t) {
126-
if (error == null) {
126+
for (;;) {
127+
Subscription a = s.get();
128+
if (a == this || a == SubscriptionHelper.CANCELLED) {
129+
RxJavaPlugins.onError(t);
130+
return;
131+
}
127132
error = t;
128-
129-
for (;;) {
130-
Subscription a = s.get();
131-
if (a == this || a == SubscriptionHelper.CANCELLED) {
132-
RxJavaPlugins.onError(t);
133-
return;
134-
}
135-
if (s.compareAndSet(a, this)) {
136-
countDown();
137-
return;
138-
}
133+
if (s.compareAndSet(a, this)) {
134+
countDown();
135+
return;
139136
}
140-
} else {
141-
RxJavaPlugins.onError(t);
142137
}
143138
}
144139

src/main/java/io/reactivex/internal/subscriptions/ArrayCompositeSubscription.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ public boolean setResource(int index, Subscription resource) {
4444
for (;;) {
4545
Subscription o = get(index);
4646
if (o == SubscriptionHelper.CANCELLED) {
47-
resource.cancel();
47+
if (resource != null) {
48+
resource.cancel();
49+
}
4850
return false;
4951
}
5052
if (compareAndSet(index, o, resource)) {
@@ -66,7 +68,9 @@ public Subscription replaceResource(int index, Subscription resource) {
6668
for (;;) {
6769
Subscription o = get(index);
6870
if (o == SubscriptionHelper.CANCELLED) {
69-
resource.cancel();
71+
if (resource != null) {
72+
resource.cancel();
73+
}
7074
return null;
7175
}
7276
if (compareAndSet(index, o, resource)) {

src/main/java/io/reactivex/internal/subscriptions/AsyncSubscription.java

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22-
import io.reactivex.internal.util.BackpressureHelper;
2322

2423
/**
2524
* A subscription implementation that arbitrates exactly one other Subscription and can
@@ -47,19 +46,7 @@ public AsyncSubscription(Disposable resource) {
4746

4847
@Override
4948
public void request(long n) {
50-
Subscription s = actual.get();
51-
if (s != null) {
52-
s.request(n);
53-
} else if (SubscriptionHelper.validate(n)) {
54-
BackpressureHelper.add(this, n);
55-
s = actual.get();
56-
if (s != null) {
57-
long mr = getAndSet(0L);
58-
if (mr != 0L) {
59-
s.request(mr);
60-
}
61-
}
62-
}
49+
SubscriptionHelper.deferredRequest(actual, this, n);
6350
}
6451

6552
@Override
@@ -100,27 +87,8 @@ public boolean replaceResource(Disposable r) {
10087
/**
10188
* Sets the given subscription if there isn't any subscription held.
10289
* @param s the first and only subscription to set
103-
* @return false if this AsyncSubscription has been cancelled/disposed
10490
*/
105-
public boolean setSubscription(Subscription s) {
106-
for (;;) {
107-
Subscription a = actual.get();
108-
if (a == SubscriptionHelper.CANCELLED) {
109-
s.cancel();
110-
return false;
111-
}
112-
if (a != null) {
113-
s.cancel();
114-
SubscriptionHelper.reportSubscriptionSet();
115-
return true;
116-
}
117-
if (actual.compareAndSet(null, s)) {
118-
long mr = getAndSet(0L);
119-
if (mr != 0L) {
120-
s.request(mr);
121-
}
122-
return true;
123-
}
124-
}
91+
public void setSubscription(Subscription s) {
92+
SubscriptionHelper.deferredSetOnce(actual, this, s);
12593
}
12694
}

src/main/java/io/reactivex/internal/subscriptions/DeferredScalarSubscription.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,13 @@ public final void request(long n) {
8282
if (state == NO_REQUEST_HAS_VALUE) {
8383
if (compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
8484
T v = value;
85-
value = null;
86-
Subscriber<? super T> a = actual;
87-
a.onNext(v);
88-
if (get() != CANCELLED) {
89-
a.onComplete();
85+
if (v != null) {
86+
value = null;
87+
Subscriber<? super T> a = actual;
88+
a.onNext(v);
89+
if (get() != CANCELLED) {
90+
a.onComplete();
91+
}
9092
}
9193
}
9294
return;

0 commit comments

Comments
 (0)