Skip to content

Commit 2e39d99

Browse files
authored
2.x: Expand {X}Processor JavaDocs by syncing with {X}Subject docs (#6054)
* 2.x: Expand {X}Processor JavaDocs by syncing with {X}Subject docs * A-an * Fix javadoc warnings
1 parent b041e32 commit 2e39d99

File tree

8 files changed

+388
-98
lines changed

8 files changed

+388
-98
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,9 @@ public static <T> Flowable<T> concat(Publisher<? extends MaybeSource<? extends T
272272

273273
/**
274274
* Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources in the array.
275-
* <dl>
276275
* <p>
277276
* <img width="640" height="526" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.concatArray.png" alt="">
277+
* <dl>
278278
* <dt><b>Backpressure:</b></dt>
279279
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
280280
* <dt><b>Scheduler:</b></dt>

src/main/java/io/reactivex/Observable.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13173,7 +13173,6 @@ public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler
1317313173
* will be emitted by the resulting ObservableSource.
1317413174
* <p>
1317513175
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.png" alt="">
13176-
* <p>
1317713176
* <dl>
1317813177
* <dt><b>Scheduler:</b></dt>
1317913178
* <dd>{@code throttleWithTimeout} operates by default on the {@code computation} {@link Scheduler}.</dd>

src/main/java/io/reactivex/processors/AsyncProcessor.java

Lines changed: 95 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,90 @@
2828
* <p>
2929
* <img width="640" height="239" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/AsyncProcessor.png" alt="">
3030
* <p>
31-
* The implementation of onXXX methods are technically thread-safe but non-serialized calls
32-
* to them may lead to undefined state in the currently subscribed Subscribers.
31+
* This processor does not have a public constructor by design; a new empty instance of this
32+
* {@code AsyncProcessor} can be created via the {@link #create()} method.
33+
* <p>
34+
* Since an {@code AsyncProcessor} is a Reactive Streams {@code Processor} type,
35+
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>)
36+
* as parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
37+
* {@link NullPointerException} being thrown and the processor's state is not changed.
38+
* <p>
39+
* {@code AsyncProcessor} is a {@link io.reactivex.Flowable} as well as a {@link FlowableProcessor} and supports backpressure from the downstream but
40+
* its {@link Subscriber}-side consumes items in an unbounded manner.
41+
* <p>
42+
* When this {@code AsyncProcessor} is terminated via {@link #onError(Throwable)}, the
43+
* last observed item (if any) is cleared and late {@link Subscriber}s only receive
44+
* the {@code onError} event.
45+
* <p>
46+
* The {@code AsyncProcessor} caches the latest item internally and it emits this item only when {@code onComplete} is called.
47+
* Therefore, it is not recommended to use this {@code Processor} with infinite or never-completing sources.
48+
* <p>
49+
* Even though {@code AsyncProcessor} implements the {@link Subscriber} interface, calling
50+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
51+
* if the processor is used as a standalone source. However, calling {@code onSubscribe}
52+
* after the {@code AsyncProcessor} reached its terminal state will result in the
53+
* given {@link Subscription} being canceled immediately.
54+
* <p>
55+
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
56+
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
57+
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
58+
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
59+
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
60+
* The implementation of {@code onXXX} methods are technically thread-safe but non-serialized calls
61+
* to them may lead to undefined state in the currently subscribed {@code Subscriber}s.
62+
* <p>
63+
* This {@code AsyncProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
64+
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the very last observed value -
65+
* after this {@code AsyncProcessor} has been completed - in a non-blocking and thread-safe
66+
* manner via {@link #hasValue()}, {@link #getValue()}, {@link #getValues()} or {@link #getValues(Object[])}.
67+
* <dl>
68+
* <dt><b>Backpressure:</b></dt>
69+
* <dd>The {@code AsyncProcessor} honors the backpressure of the downstream {@code Subscriber}s and won't emit
70+
* its single value to a particular {@code Subscriber} until that {@code Subscriber} has requested an item.
71+
* When the {@code AsyncProcessor} is subscribed to a {@link io.reactivex.Flowable}, the processor consumes this
72+
* {@code Flowable} in an unbounded manner (requesting `Long.MAX_VALUE`) as only the very last upstream item is
73+
* retained by it.
74+
* </dd>
75+
* <dt><b>Scheduler:</b></dt>
76+
* <dd>{@code AsyncProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
77+
* the {@code Subscriber}s get notified on the thread where the terminating {@code onError} or {@code onComplete}
78+
* methods were invoked.</dd>
79+
* <dt><b>Error handling:</b></dt>
80+
* <dd>When the {@link #onError(Throwable)} is called, the {@code AsyncProcessor} enters into a terminal state
81+
* and emits the same {@code Throwable} instance to the last set of {@code Subscriber}s. During this emission,
82+
* if one or more {@code Subscriber}s dispose their respective {@code Subscription}s, the
83+
* {@code Throwable} is delivered to the global error handler via
84+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Subscriber}s
85+
* cancel at once).
86+
* If there were no {@code Subscriber}s subscribed to this {@code AsyncProcessor} when the {@code onError()}
87+
* was called, the global error handler is not invoked.
88+
* </dd>
89+
* </dl>
90+
* <p>
91+
* Example usage:
92+
* <pre><code>
93+
* AsyncProcessor&lt;Object&gt; processor = AsyncProcessor.create();
94+
*
95+
* TestSubscriber&lt;Object&gt; ts1 = processor.test();
96+
*
97+
* ts1.assertEmpty();
98+
*
99+
* processor.onNext(1);
100+
*
101+
* // AsyncProcessor only emits when onComplete was called.
102+
* ts1.assertEmpty();
103+
*
104+
* processor.onNext(2);
105+
* processor.onComplete();
106+
*
107+
* // onComplete triggers the emission of the last cached item and the onComplete event.
108+
* ts1.assertResult(2);
109+
*
110+
* TestSubscriber&lt;Object&gt; ts2 = processor.test();
33111
*
112+
* // late Subscribers receive the last cached item too
113+
* ts2.assertResult(2);
114+
* </code></pre>
34115
* @param <T> the value type
35116
*/
36117
public final class AsyncProcessor<T> extends FlowableProcessor<T> {
@@ -75,7 +156,7 @@ public void onSubscribe(Subscription s) {
75156
s.cancel();
76157
return;
77158
}
78-
// PublishSubject doesn't bother with request coordination.
159+
// AsyncProcessor doesn't bother with request coordination.
79160
s.request(Long.MAX_VALUE);
80161
}
81162

@@ -168,9 +249,9 @@ protected void subscribeActual(Subscriber<? super T> s) {
168249

169250
/**
170251
* Tries to add the given subscriber to the subscribers array atomically
171-
* or returns false if the subject has terminated.
252+
* or returns false if the processor has terminated.
172253
* @param ps the subscriber to add
173-
* @return true if successful, false if the subject has terminated
254+
* @return true if successful, false if the processor has terminated
174255
*/
175256
boolean add(AsyncSubscription<T> ps) {
176257
for (;;) {
@@ -192,8 +273,8 @@ boolean add(AsyncSubscription<T> ps) {
192273
}
193274

194275
/**
195-
* Atomically removes the given subscriber if it is subscribed to the subject.
196-
* @param ps the subject to remove
276+
* Atomically removes the given subscriber if it is subscribed to this processor.
277+
* @param ps the subscriber's subscription wrapper to remove
197278
*/
198279
@SuppressWarnings("unchecked")
199280
void remove(AsyncSubscription<T> ps) {
@@ -232,28 +313,28 @@ void remove(AsyncSubscription<T> ps) {
232313
}
233314

234315
/**
235-
* Returns true if the subject has any value.
316+
* Returns true if this processor has any value.
236317
* <p>The method is thread-safe.
237-
* @return true if the subject has any value
318+
* @return true if this processor has any value
238319
*/
239320
public boolean hasValue() {
240321
return subscribers.get() == TERMINATED && value != null;
241322
}
242323

243324
/**
244-
* Returns a single value the Subject currently has or null if no such value exists.
325+
* Returns a single value this processor currently has or null if no such value exists.
245326
* <p>The method is thread-safe.
246-
* @return a single value the Subject currently has or null if no such value exists
327+
* @return a single value this processor currently has or null if no such value exists
247328
*/
248329
@Nullable
249330
public T getValue() {
250331
return subscribers.get() == TERMINATED ? value : null;
251332
}
252333

253334
/**
254-
* Returns an Object array containing snapshot all values of the Subject.
335+
* Returns an Object array containing snapshot all values of this processor.
255336
* <p>The method is thread-safe.
256-
* @return the array containing the snapshot of all values of the Subject
337+
* @return the array containing the snapshot of all values of this processor
257338
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
258339
*/
259340
@Deprecated
@@ -263,7 +344,7 @@ public Object[] getValues() {
263344
}
264345

265346
/**
266-
* Returns a typed array containing a snapshot of all values of the Subject.
347+
* Returns a typed array containing a snapshot of all values of this processor.
267348
* <p>The method follows the conventions of Collection.toArray by setting the array element
268349
* after the last value to null (if the capacity permits).
269350
* <p>The method is thread-safe.

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,13 @@
8585
* after the {@code BehaviorProcessor} reached its terminal state will result in the
8686
* given {@code Subscription} being cancelled immediately.
8787
* <p>
88-
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
88+
* Calling {@link #onNext(Object)}, {@link #offer(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
8989
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
9090
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
9191
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
9292
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
93+
* Note that serializing over {@link #offer(Object)} is not supported through {@code toSerialized()} because it is a method
94+
* available on the {@code PublishProcessor} and {@code BehaviorProcessor} classes only.
9395
* <p>
9496
* This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
9597
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value
@@ -127,34 +129,34 @@
127129
* Example usage:
128130
* <pre> {@code
129131
130-
// observer will receive all events.
132+
// subscriber will receive all events.
131133
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
132-
processor.subscribe(observer);
134+
processor.subscribe(subscriber);
133135
processor.onNext("one");
134136
processor.onNext("two");
135137
processor.onNext("three");
136138
137-
// observer will receive the "one", "two" and "three" events, but not "zero"
139+
// subscriber will receive the "one", "two" and "three" events, but not "zero"
138140
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
139141
processor.onNext("zero");
140142
processor.onNext("one");
141-
processor.subscribe(observer);
143+
processor.subscribe(subscriber);
142144
processor.onNext("two");
143145
processor.onNext("three");
144146
145-
// observer will receive only onComplete
147+
// subscriber will receive only onComplete
146148
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
147149
processor.onNext("zero");
148150
processor.onNext("one");
149151
processor.onComplete();
150-
processor.subscribe(observer);
152+
processor.subscribe(subscriber);
151153
152-
// observer will receive only onError
154+
// subscriber will receive only onError
153155
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
154156
processor.onNext("zero");
155157
processor.onNext("one");
156158
processor.onError(new RuntimeException("error"));
157-
processor.subscribe(observer);
159+
processor.subscribe(subscriber);
158160
} </pre>
159161
*
160162
* @param <T>

src/main/java/io/reactivex/processors/PublishProcessor.java

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,67 @@
2828
*
2929
* <p>
3030
* <img width="640" height="278" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/PublishProcessor.png" alt="">
31-
*
32-
* <p>The processor does not coordinate backpressure for its subscribers and implements a weaker onSubscribe which
33-
* calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the PublishProcessor
34-
* to multiple sources (note on serialization though) unlike the standard Subscriber contract. Child subscribers, however, are not overflown but receive an
35-
* IllegalStateException in case their requested amount is zero.
36-
*
37-
* <p>The implementation of onXXX methods are technically thread-safe but non-serialized calls
38-
* to them may lead to undefined state in the currently subscribed Subscribers.
39-
*
40-
* <p>Due to the nature Flowables are constructed, the PublishProcessor can't be instantiated through
41-
* {@code new} but must be created via the {@link #create()} method.
31+
* <p>
32+
* This processor does not have a public constructor by design; a new empty instance of this
33+
* {@code PublishProcessor} can be created via the {@link #create()} method.
34+
* <p>
35+
* Since a {@code PublishProcessor} is a Reactive Streams {@code Processor} type,
36+
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as
37+
* parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
38+
* {@link NullPointerException} being thrown and the processor's state is not changed.
39+
* <p>
40+
* {@code PublishProcessor} is a {@link io.reactivex.Flowable} as well as a {@link FlowableProcessor},
41+
* however, it does not coordinate backpressure between different subscribers and between an
42+
* upstream source and a subscriber. If an upstream item is received via {@link #onNext(Object)}, if
43+
* a subscriber is not ready to receive an item, that subscriber is terminated via a {@link MissingBackpressureException}.
44+
* To avoid this case, use {@link #offer(Object)} and retry sometime later if it returned false.
45+
* The {@code PublishProcessor}'s {@link Subscriber}-side consumes items in an unbounded manner.
46+
* <p>
47+
* For a multicasting processor type that also coordinates between the downstream {@code Subscriber}s and the upstream
48+
* source as well, consider using {@link MulticastProcessor}.
49+
* <p>
50+
* When this {@code PublishProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()},
51+
* late {@link Subscriber}s only receive the respective terminal event.
52+
* <p>
53+
* Unlike a {@link BehaviorProcessor}, a {@code PublishProcessor} doesn't retain/cache items, therefore, a new
54+
* {@code Subscriber} won't receive any past items.
55+
* <p>
56+
* Even though {@code PublishProcessor} implements the {@link Subscriber} interface, calling
57+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
58+
* if the processor is used as a standalone source. However, calling {@code onSubscribe}
59+
* after the {@code PublishProcessor} reached its terminal state will result in the
60+
* given {@link Subscription} being canceled immediately.
61+
* <p>
62+
* Calling {@link #onNext(Object)}, {@link #offer(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
63+
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
64+
* through external means of serialization). The {@link #toSerialized()} method available to all {@link FlowableProcessor}s
65+
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
66+
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
67+
* Note that serializing over {@link #offer(Object)} is not supported through {@code toSerialized()} because it is a method
68+
* available on the {@code PublishProcessor} and {@code BehaviorProcessor} classes only.
69+
* <p>
70+
* This {@code PublishProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
71+
* {@link #getThrowable()} and {@link #hasSubscribers()}.
72+
* <dl>
73+
* <dt><b>Backpressure:</b></dt>
74+
* <dd>The processor does not coordinate backpressure for its subscribers and implements a weaker {@code onSubscribe} which
75+
* calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the {@code PublishProcessor}
76+
* to multiple sources (note on serialization though) unlike the standard {@code Subscriber} contract. Child subscribers, however, are not overflown but receive an
77+
* {@link IllegalStateException} in case their requested amount is zero.</dd>
78+
* <dt><b>Scheduler:</b></dt>
79+
* <dd>{@code PublishProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
80+
* the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
81+
* <dt><b>Error handling:</b></dt>
82+
* <dd>When the {@link #onError(Throwable)} is called, the {@code PublishProcessor} enters into a terminal state
83+
* and emits the same {@code Throwable} instance to the last set of {@code Subscriber}s. During this emission,
84+
* if one or more {@code Subscriber}s cancel their respective {@code Subscription}s, the
85+
* {@code Throwable} is delivered to the global error handler via
86+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Subscriber}s
87+
* cancel at once).
88+
* If there were no {@code Subscriber}s subscribed to this {@code PublishProcessor} when the {@code onError()}
89+
* was called, the global error handler is not invoked.
90+
* </dd>
91+
* </dl>
4292
*
4393
* Example usage:
4494
* <pre> {@code
@@ -55,6 +105,7 @@
55105
56106
} </pre>
57107
* @param <T> the value type multicasted to Subscribers.
108+
* @see MulticastProcessor
58109
*/
59110
public final class PublishProcessor<T> extends FlowableProcessor<T> {
60111
/** The terminated indicator for the subscribers array. */
@@ -113,9 +164,9 @@ protected void subscribeActual(Subscriber<? super T> t) {
113164

114165
/**
115166
* Tries to add the given subscriber to the subscribers array atomically
116-
* or returns false if the subject has terminated.
167+
* or returns false if this processor has terminated.
117168
* @param ps the subscriber to add
118-
* @return true if successful, false if the subject has terminated
169+
* @return true if successful, false if this processor has terminated
119170
*/
120171
boolean add(PublishSubscription<T> ps) {
121172
for (;;) {
@@ -137,8 +188,8 @@ boolean add(PublishSubscription<T> ps) {
137188
}
138189

139190
/**
140-
* Atomically removes the given subscriber if it is subscribed to the subject.
141-
* @param ps the subject to remove
191+
* Atomically removes the given subscriber if it is subscribed to this processor.
192+
* @param ps the subscription wrapping a subscriber to remove
142193
*/
143194
@SuppressWarnings("unchecked")
144195
void remove(PublishSubscription<T> ps) {
@@ -182,7 +233,7 @@ public void onSubscribe(Subscription s) {
182233
s.cancel();
183234
return;
184235
}
185-
// PublishSubject doesn't bother with request coordination.
236+
// PublishProcessor doesn't bother with request coordination.
186237
s.request(Long.MAX_VALUE);
187238
}
188239

@@ -288,7 +339,7 @@ static final class PublishSubscription<T> extends AtomicLong implements Subscrip
288339
private static final long serialVersionUID = 3562861878281475070L;
289340
/** The actual subscriber. */
290341
final Subscriber<? super T> actual;
291-
/** The subject state. */
342+
/** The parent processor servicing this subscriber. */
292343
final PublishProcessor<T> parent;
293344

294345
/**

0 commit comments

Comments
 (0)