Skip to content

Commit b930295

Browse files
groupByUntil -> groupBy
This collapses groupByUntil and groupBy into a single groupBy operator. The new implementation has 2 major changes: 1) It supports reactive pull backpressure. 2) Child GroupedObservables can be unsubscribed and they will be cleaned up and then new instances for the same key can be emitted, like groupByUntil, except that now instead of passing in a special durationSelector function, the child can be composed using take/takeUntil/etc to cause an unsubscribe. If the previous non-obvious groupBy behavior is wanted, then instead of unsubscribing, it can be filtered to ignore all further data, which is what the old groupBy used to do when a child was unsubscribed.
1 parent db2ee9a commit b930295

File tree

5 files changed

+631
-1126
lines changed

5 files changed

+631
-1126
lines changed

src/main/java/rx/Observable.java

Lines changed: 0 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -4708,15 +4708,6 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
47084708
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
47094709
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
47104710
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
4711-
* <dl>
4712-
* <dt><b>Backpressure Support:</b></dt>
4713-
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
4714-
* observable" and blocking any one group would block the entire parent stream. If you need
4715-
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
4716-
* or {@link #onBackpressureBuffer}.</dd>
4717-
* <dt><b>Scheduler:</b></dt>
4718-
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
4719-
* </dl>
47204711
*
47214712
* @param keySelector
47224713
* a function that extracts the key for each item
@@ -4746,15 +4737,6 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
47464737
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
47474738
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
47484739
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
4749-
* <dl>
4750-
* <dt><b>Backpressure Support:</b></dt>
4751-
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
4752-
* observable" and blocking any one group would block the entire parent stream. If you need
4753-
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
4754-
* or {@link #onBackpressureBuffer}.</dd>
4755-
* <dt><b>Scheduler:</b></dt>
4756-
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
4757-
* </dl>
47584740
*
47594741
* @param keySelector
47604742
* a function that extracts the key for each item
@@ -4770,77 +4752,6 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super
47704752
return lift(new OperatorGroupBy<T, K, T>(keySelector));
47714753
}
47724754

4773-
/**
4774-
* Groups the items emitted by an {@code Observable} according to a specified key selector function until
4775-
* the duration {@code Observable} expires for the key.
4776-
* <p>
4777-
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupByUntil.png" alt="">
4778-
* <p>
4779-
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
4780-
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
4781-
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
4782-
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
4783-
* <dl>
4784-
* <dt><b>Backpressure Support:</b></dt>
4785-
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
4786-
* observable" and blocking any one group would block the entire parent stream. If you need
4787-
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
4788-
* or {@link #onBackpressureBuffer}.</dd>
4789-
* <dt><b>Scheduler:</b></dt>
4790-
* <dd>{@code groupByUntil} does not operate by default on a particular {@link Scheduler}.</dd>
4791-
* </dl>
4792-
*
4793-
* @param keySelector
4794-
* a function to extract the key for each item
4795-
* @param durationSelector
4796-
* a function to signal the expiration of a group
4797-
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
4798-
* value and each of which emits all items emitted by the source {@code Observable} during that
4799-
* key's duration that share that same key value
4800-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupByUntil</a>
4801-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211932.aspx">MSDN: Observable.GroupByUntil</a>
4802-
*/
4803-
public final <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<? extends TDuration>> durationSelector) {
4804-
return groupByUntil(keySelector, Functions.<T> identity(), durationSelector);
4805-
}
4806-
4807-
/**
4808-
* Groups the items emitted by an {@code Observable} (transformed by a selector) according to a specified
4809-
* key selector function until the duration Observable expires for the key.
4810-
* <p>
4811-
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupByUntil.png" alt="">
4812-
* <p>
4813-
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
4814-
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
4815-
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
4816-
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
4817-
* <dl>
4818-
* <dt><b>Backpressure Support:</b></dt>
4819-
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
4820-
* observable" and blocking any one group would block the entire parent stream. If you need
4821-
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
4822-
* or {@link #onBackpressureBuffer}.</dd>
4823-
* <dt><b>Scheduler:</b></dt>
4824-
* <dd>{@code groupByUntil} does not operate by default on a particular {@link Scheduler}.</dd>
4825-
* </dl>
4826-
*
4827-
* @param keySelector
4828-
* a function to extract the key for each item
4829-
* @param valueSelector
4830-
* a function to map each item emitted by the source {@code Observable} to an item emitted by one
4831-
* of the resulting {@link GroupedObservable}s
4832-
* @param durationSelector
4833-
* a function to signal the expiration of a group
4834-
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
4835-
* value and each of which emits all items emitted by the source {@code Observable} during that
4836-
* key's duration that share that same key value, transformed by the value selector
4837-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupByUntil</a>
4838-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229433.aspx">MSDN: Observable.GroupByUntil</a>
4839-
*/
4840-
public final <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<? extends TDuration>> durationSelector) {
4841-
return lift(new OperatorGroupByUntil<T, TKey, TValue, TDuration>(keySelector, valueSelector, durationSelector));
4842-
}
4843-
48444755
/**
48454756
* Returns an Observable that correlates two Observables when they overlap in time and groups the results.
48464757
* <p>

0 commit comments

Comments
 (0)