Skip to content

Commit a1c3ba9

Browse files
authored
2.x: Improve BehaviorProcessor JavaDoc (#5778)
* 2.x: Improve BehaviorProcessor JavaDoc * Use > in the first example
1 parent 20a72ca commit a1c3ba9

File tree

1 file changed

+90
-9
lines changed

1 file changed

+90
-9
lines changed

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,87 @@
3333
* <p>
3434
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.BehaviorProcessor.png" alt="">
3535
* <p>
36+
* This processor does not have a public constructor by design; a new empty instance of this
37+
* {@code BehaviorSubject} can be created via the {@link #create()} method and
38+
* a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid
39+
* overload resolution conflict with {@code Flowable.create} that creates a Flowable, not a {@code BehaviorProcessor}).
40+
* <p>
41+
* In accordance with the Reactive Streams specification (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>)
42+
* {@code null}s are not allowed as default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and
43+
* {@link #onError(Throwable)}.
44+
* <p>
45+
* When this {@code BehaviorProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, the
46+
* last observed item (if any) is cleared and late {@link org.reactivestreams.Subscriber}s only receive
47+
* the respective terminal event.
48+
* <p>
49+
* The {@code BehaviorProcessor} does not support clearing its cached value (to appear empty again), however, the
50+
* effect can be achieved by using a special item and making sure {@code Subscriber}s subscribe through a
51+
* filter whose predicate filters out this special item:
52+
* <pre><code>
53+
* BehaviorProcessor&lt;Integer&gt; processor = BehaviorProcessor.create();
54+
*
55+
* final Integer EMPTY = Integer.MIN_VALUE;
56+
*
57+
* Flowable&lt;Integer&gt; flowable = processor.filter(v -&gt; v != EMPTY);
58+
*
59+
* TestSubscriber&lt;Integer&gt; ts1 = flowable.test();
60+
*
61+
* processor.onNext(1);
62+
* // this will "clear" the cache
63+
* processor.onNext(EMPTY);
64+
*
65+
* TestSubscriber&lt;Integer&gt; ts2 = flowable.test();
66+
*
67+
* processor.onNext(2);
68+
* processor.onComplete();
69+
*
70+
* // ts1 received both non-empty items
71+
* ts1.assertResult(1, 2);
72+
*
73+
* // ts2 received only 2 even though the current item was EMPTY
74+
* // when it got subscribed
75+
* ts2.assertResult(2);
76+
*
77+
* // Subscribers coming after the processor was terminated receive
78+
* // no items and only the onComplete event in this case.
79+
* flowable.test().assertResult();
80+
* </code></pre>
81+
* <p>
82+
* Even though {@code BehaviorProcessor} implements the {@code Subscriber} interface, calling
83+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
84+
* if the processor is used as a standalone source. However, calling {@code onSubscribe} is
85+
* called after the {@code BehaviorProcessor} reached its terminal state will result in the
86+
* given {@code Subscription} being cancelled immediately.
87+
* <p>
88+
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
89+
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
90+
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
91+
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
92+
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively.
93+
* <p>
94+
* This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
95+
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value
96+
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
97+
* {@link #getValues()} or {@link #getValues(Object[])}.
98+
* <p>
99+
* Note that this processor signals {@code MissingBackpressureException} if a particular {@code Subscriber} is not
100+
* ready to receive {@code onNext} events. To avoid this exception being signaled, use {@link #offer(Object)} to only
101+
* try to emit an item when all {@code Subscriber}s have requested item(s).
102+
* <dl>
103+
* <dt><b>Backpressure:</b></dt>
104+
* <dd>The {@code BehaviorProcessor} does not coordinate requests of its downstream {@code Subscriber}s and
105+
* expects each individual {@code Subscriber} is ready to receive {@code onNext} items when {@link #onNext(Object)}
106+
* is called. If a {@code Subscriber} is not ready, a {@code MissingBackpressureException} is signalled to it.
107+
* To avoid overflowing the current {@code Subscriber}s, the conditional {@link #offer(Object)} method is available
108+
* that returns true if any of the {@code Subscriber}s is not ready to receive {@code onNext} events. If
109+
* there are no {@code Subscriber}s to the processor, {@code offer()} always succeeds.
110+
* If the {@code BehaviorProcessor} is (optionally) subscribed to another {@code Publisher}, this upstream
111+
* {@code Publisher} is consumed in an unbounded fashion (requesting {@code Long.MAX_VALUE}).</dd>
112+
* <dt><b>Scheduler:</b></dt>
113+
* <dd>{@code BehaviorProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
114+
* the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
115+
* </dl>
116+
* <p>
36117
* Example usage:
37118
* <pre> {@code
38119
@@ -94,7 +175,7 @@ public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
94175
* Creates a {@link BehaviorProcessor} without a default item.
95176
*
96177
* @param <T>
97-
* the type of item the Subject will emit
178+
* the type of item the BehaviorProcessor will emit
98179
* @return the constructed {@link BehaviorProcessor}
99180
*/
100181
@CheckReturnValue
@@ -107,7 +188,7 @@ public static <T> BehaviorProcessor<T> create() {
107188
* {@link Subscriber} that subscribes to it.
108189
*
109190
* @param <T>
110-
* the type of item the Subject will emit
191+
* the type of item the BehaviorProcessor will emit
111192
* @param defaultValue
112193
* the item that will be emitted first to any {@link Subscriber} as long as the
113194
* {@link BehaviorProcessor} has not yet observed any items from its source {@code Observable}
@@ -266,9 +347,9 @@ public Throwable getThrowable() {
266347
}
267348

268349
/**
269-
* Returns a single value the Subject currently has or null if no such value exists.
350+
* Returns a single value the BehaviorProcessor currently has or null if no such value exists.
270351
* <p>The method is thread-safe.
271-
* @return a single value the Subject currently has or null if no such value exists
352+
* @return a single value the BehaviorProcessor currently has or null if no such value exists
272353
*/
273354
public T getValue() {
274355
Object o = value.get();
@@ -279,9 +360,9 @@ public T getValue() {
279360
}
280361

281362
/**
282-
* Returns an Object array containing snapshot all values of the Subject.
363+
* Returns an Object array containing snapshot all values of the BehaviorProcessor.
283364
* <p>The method is thread-safe.
284-
* @return the array containing the snapshot of all values of the Subject
365+
* @return the array containing the snapshot of all values of the BehaviorProcessor
285366
*/
286367
public Object[] getValues() {
287368
@SuppressWarnings("unchecked")
@@ -295,7 +376,7 @@ public Object[] getValues() {
295376
}
296377

297378
/**
298-
* Returns a typed array containing a snapshot of all values of the Subject.
379+
* Returns a typed array containing a snapshot of all values of the BehaviorProcessor.
299380
* <p>The method follows the conventions of Collection.toArray by setting the array element
300381
* after the last value to null (if the capacity permits).
301382
* <p>The method is thread-safe.
@@ -337,9 +418,9 @@ public boolean hasThrowable() {
337418
}
338419

339420
/**
340-
* Returns true if the subject has any value.
421+
* Returns true if the BehaviorProcessor has any value.
341422
* <p>The method is thread-safe.
342-
* @return true if the subject has any value
423+
* @return true if the BehaviorProcessor has any value
343424
*/
344425
public boolean hasValue() {
345426
Object o = value.get();

0 commit comments

Comments
 (0)