Skip to content

Commit 7ba9a3e

Browse files
authored
2.x: Subject NPE fixes, add UnicastProcessor TCK (#5760)
* 2.x: add Subject/Processor refCount(), Subject NPE fixes * Fix wording * Move RefCountProcessor into tests * Improve style
1 parent 39e159f commit 7ba9a3e

30 files changed

+464
-473
lines changed

src/main/java/io/reactivex/processors/AsyncProcessor.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
import java.util.Arrays;
1616
import java.util.concurrent.atomic.AtomicReference;
1717

18+
import org.reactivestreams.*;
19+
1820
import io.reactivex.annotations.*;
21+
import io.reactivex.internal.functions.ObjectHelper;
1922
import io.reactivex.internal.subscriptions.DeferredScalarSubscription;
2023
import io.reactivex.plugins.RxJavaPlugins;
21-
import org.reactivestreams.*;
2224

2325
/**
2426
* Processor that emits the very last value followed by a completion event or the received error
@@ -77,32 +79,17 @@ public void onSubscribe(Subscription s) {
7779

7880
@Override
7981
public void onNext(T t) {
82+
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
8083
if (subscribers.get() == TERMINATED) {
8184
return;
8285
}
83-
if (t == null) {
84-
nullOnNext();
85-
return;
86-
}
8786
value = t;
8887
}
8988

90-
@SuppressWarnings("unchecked")
91-
void nullOnNext() {
92-
value = null;
93-
Throwable ex = new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
94-
error = ex;
95-
for (AsyncSubscription<T> as : subscribers.getAndSet(TERMINATED)) {
96-
as.onError(ex);
97-
}
98-
}
99-
10089
@SuppressWarnings("unchecked")
10190
@Override
10291
public void onError(Throwable t) {
103-
if (t == null) {
104-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
105-
}
92+
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
10693
if (subscribers.get() == TERMINATED) {
10794
RxJavaPlugins.onError(t);
10895
return;

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,8 @@ public void onSubscribe(Subscription s) {
175175

176176
@Override
177177
public void onNext(T t) {
178-
if (t == null) {
179-
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
180-
return;
181-
}
178+
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
179+
182180
if (terminalEvent.get() != null) {
183181
return;
184182
}
@@ -191,9 +189,7 @@ public void onNext(T t) {
191189

192190
@Override
193191
public void onError(Throwable t) {
194-
if (t == null) {
195-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
196-
}
192+
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
197193
if (!terminalEvent.compareAndSet(null, t)) {
198194
RxJavaPlugins.onError(t);
199195
return;

src/main/java/io/reactivex/processors/FlowableProcessor.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313

1414
package io.reactivex.processors;
1515

16-
import io.reactivex.*;
17-
import io.reactivex.annotations.NonNull;
1816
import org.reactivestreams.Processor;
1917

18+
import io.reactivex.*;
19+
import io.reactivex.annotations.*;
20+
2021
/**
2122
* Represents a Subscriber and a Flowable (Publisher) at the same time, allowing
2223
* multicasting events from a single source to multiple child Subscribers.
@@ -28,45 +29,47 @@
2829
public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> {
2930

3031
/**
31-
* Returns true if the subject has subscribers.
32+
* Returns true if the FlowableProcessor has subscribers.
3233
* <p>The method is thread-safe.
33-
* @return true if the subject has subscribers
34+
* @return true if the FlowableProcessor has subscribers
3435
*/
3536
public abstract boolean hasSubscribers();
3637

3738
/**
38-
* Returns true if the subject has reached a terminal state through an error event.
39+
* Returns true if the FlowableProcessor has reached a terminal state through an error event.
3940
* <p>The method is thread-safe.
40-
* @return true if the subject has reached a terminal state through an error event
41+
* @return true if the FlowableProcessor has reached a terminal state through an error event
4142
* @see #getThrowable()
4243
* @see #hasComplete()
4344
*/
4445
public abstract boolean hasThrowable();
4546

4647
/**
47-
* Returns true if the subject has reached a terminal state through a complete event.
48+
* Returns true if the FlowableProcessor has reached a terminal state through a complete event.
4849
* <p>The method is thread-safe.
49-
* @return true if the subject has reached a terminal state through a complete event
50+
* @return true if the FlowableProcessor has reached a terminal state through a complete event
5051
* @see #hasThrowable()
5152
*/
5253
public abstract boolean hasComplete();
5354

5455
/**
55-
* Returns the error that caused the Subject to terminate or null if the Subject
56+
* Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
5657
* hasn't terminated yet.
5758
* <p>The method is thread-safe.
58-
* @return the error that caused the Subject to terminate or null if the Subject
59+
* @return the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
5960
* hasn't terminated yet
6061
*/
62+
@Nullable
6163
public abstract Throwable getThrowable();
6264

6365
/**
64-
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
66+
* Wraps this FlowableProcessor and serializes the calls to the onSubscribe, onNext, onError and
6567
* onComplete methods, making them thread-safe.
6668
* <p>The method is thread-safe.
67-
* @return the wrapped and serialized subject
69+
* @return the wrapped and serialized FlowableProcessor
6870
*/
6971
@NonNull
72+
@CheckReturnValue
7073
public final FlowableProcessor<T> toSerialized() {
7174
if (this instanceof SerializedProcessor) {
7275
return this;

src/main/java/io/reactivex/processors/PublishProcessor.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.reactivex.annotations.*;
2020
import io.reactivex.exceptions.MissingBackpressureException;
21+
import io.reactivex.internal.functions.ObjectHelper;
2122
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2223
import io.reactivex.internal.util.BackpressureHelper;
2324
import io.reactivex.plugins.RxJavaPlugins;
@@ -186,13 +187,10 @@ public void onSubscribe(Subscription s) {
186187

187188
@Override
188189
public void onNext(T t) {
190+
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
189191
if (subscribers.get() == TERMINATED) {
190192
return;
191193
}
192-
if (t == null) {
193-
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
194-
return;
195-
}
196194
for (PublishSubscription<T> s : subscribers.get()) {
197195
s.onNext(t);
198196
}
@@ -201,13 +199,11 @@ public void onNext(T t) {
201199
@SuppressWarnings("unchecked")
202200
@Override
203201
public void onError(Throwable t) {
202+
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
204203
if (subscribers.get() == TERMINATED) {
205204
RxJavaPlugins.onError(t);
206205
return;
207206
}
208-
if (t == null) {
209-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
210-
}
211207
error = t;
212208

213209
for (PublishSubscription<T> s : subscribers.getAndSet(TERMINATED)) {

src/main/java/io/reactivex/processors/ReplayProcessor.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -267,10 +267,8 @@ public void onSubscribe(Subscription s) {
267267

268268
@Override
269269
public void onNext(T t) {
270-
if (t == null) {
271-
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
272-
return;
273-
}
270+
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
271+
274272
if (done) {
275273
return;
276274
}
@@ -286,9 +284,8 @@ public void onNext(T t) {
286284
@SuppressWarnings("unchecked")
287285
@Override
288286
public void onError(Throwable t) {
289-
if (t == null) {
290-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
291-
}
287+
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
288+
292289
if (done) {
293290
RxJavaPlugins.onError(t);
294291
return;

src/main/java/io/reactivex/processors/UnicastProcessor.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,9 @@ public void onSubscribe(Subscription s) {
339339

340340
@Override
341341
public void onNext(T t) {
342-
if (done || cancelled) {
343-
return;
344-
}
342+
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
345343

346-
if (t == null) {
347-
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
344+
if (done || cancelled) {
348345
return;
349346
}
350347

@@ -354,15 +351,13 @@ public void onNext(T t) {
354351

355352
@Override
356353
public void onError(Throwable t) {
354+
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
355+
357356
if (done || cancelled) {
358357
RxJavaPlugins.onError(t);
359358
return;
360359
}
361360

362-
if (t == null) {
363-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
364-
}
365-
366361
error = t;
367362
done = true;
368363

src/main/java/io/reactivex/subjects/AsyncSubject.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313

1414
package io.reactivex.subjects;
1515

16-
import io.reactivex.annotations.CheckReturnValue;
1716
import java.util.Arrays;
1817
import java.util.concurrent.atomic.AtomicReference;
1918

2019
import io.reactivex.Observer;
20+
import io.reactivex.annotations.CheckReturnValue;
2121
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.functions.ObjectHelper;
2223
import io.reactivex.internal.observers.DeferredScalarDisposable;
2324
import io.reactivex.plugins.RxJavaPlugins;
2425

@@ -75,32 +76,17 @@ public void onSubscribe(Disposable s) {
7576

7677
@Override
7778
public void onNext(T t) {
79+
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
7880
if (subscribers.get() == TERMINATED) {
7981
return;
8082
}
81-
if (t == null) {
82-
nullOnNext();
83-
return;
84-
}
8583
value = t;
8684
}
8785

88-
@SuppressWarnings("unchecked")
89-
void nullOnNext() {
90-
value = null;
91-
Throwable ex = new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
92-
error = ex;
93-
for (AsyncDisposable<T> as : subscribers.getAndSet(TERMINATED)) {
94-
as.onError(ex);
95-
}
96-
}
97-
9886
@SuppressWarnings("unchecked")
9987
@Override
10088
public void onError(Throwable t) {
101-
if (t == null) {
102-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
103-
}
89+
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
10490
if (subscribers.get() == TERMINATED) {
10591
RxJavaPlugins.onError(t);
10692
return;

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,8 @@ public void onSubscribe(Disposable s) {
171171

172172
@Override
173173
public void onNext(T t) {
174-
if (t == null) {
175-
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
176-
return;
177-
}
174+
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
175+
178176
if (terminalEvent.get() != null) {
179177
return;
180178
}
@@ -187,9 +185,7 @@ public void onNext(T t) {
187185

188186
@Override
189187
public void onError(Throwable t) {
190-
if (t == null) {
191-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
192-
}
188+
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
193189
if (!terminalEvent.compareAndSet(null, t)) {
194190
RxJavaPlugins.onError(t);
195191
return;

src/main/java/io/reactivex/subjects/CompletableSubject.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
import java.util.concurrent.atomic.*;
1717

1818
import io.reactivex.*;
19-
import io.reactivex.annotations.*;
19+
import io.reactivex.annotations.CheckReturnValue;
2020
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.internal.functions.ObjectHelper;
2122
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
/**
@@ -66,9 +67,7 @@ public void onSubscribe(Disposable d) {
6667

6768
@Override
6869
public void onError(Throwable e) {
69-
if (e == null) {
70-
e = new NullPointerException("Null errors are not allowed in 2.x");
71-
}
70+
ObjectHelper.requireNonNull(e, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
7271
if (once.compareAndSet(false, true)) {
7372
this.error = e;
7473
for (CompletableDisposable md : observers.getAndSet(TERMINATED)) {

src/main/java/io/reactivex/subjects/MaybeSubject.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.*;
1919
import io.reactivex.annotations.*;
2020
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.internal.functions.ObjectHelper;
2122
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
/**
@@ -73,10 +74,7 @@ public void onSubscribe(Disposable d) {
7374
@SuppressWarnings("unchecked")
7475
@Override
7576
public void onSuccess(T value) {
76-
if (value == null) {
77-
onError(new NullPointerException("Null values are not allowed in 2.x"));
78-
return;
79-
}
77+
ObjectHelper.requireNonNull(value, "onSuccess called with null. Null values are generally not allowed in 2.x operators and sources.");
8078
if (once.compareAndSet(false, true)) {
8179
this.value = value;
8280
for (MaybeDisposable<T> md : observers.getAndSet(TERMINATED)) {
@@ -88,9 +86,7 @@ public void onSuccess(T value) {
8886
@SuppressWarnings("unchecked")
8987
@Override
9088
public void onError(Throwable e) {
91-
if (e == null) {
92-
e = new NullPointerException("Null errors are not allowed in 2.x");
93-
}
89+
ObjectHelper.requireNonNull(e, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
9490
if (once.compareAndSet(false, true)) {
9591
this.error = e;
9692
for (MaybeDisposable<T> md : observers.getAndSet(TERMINATED)) {

0 commit comments

Comments
 (0)