Skip to content

Commit 13bb25d

Browse files
groupBy with element selector
Fixes #1554
1 parent 805ddb3 commit 13bb25d

File tree

3 files changed

+96
-14
lines changed

3 files changed

+96
-14
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4931,6 +4931,44 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
49314931
subscribe(onNext, onError, onComplete);
49324932
}
49334933

4934+
/**
4935+
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
4936+
* grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
4937+
* <p>
4938+
* <img width="640" height="360" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png" alt="">
4939+
* <p>
4940+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
4941+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
4942+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
4943+
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
4944+
* <dl>
4945+
* <dt><b>Backpressure Support:</b></dt>
4946+
* <dd>This operator does not support backpressure as splitting a stream effectively turns it into a "hot
4947+
* observable" and blocking any one group would block the entire parent stream. If you need
4948+
* backpressure on individual groups then you should use operators such as {@link #onBackpressureDrop}
4949+
* or {@link #onBackpressureBuffer}.</dd>
4950+
* <dt><b>Scheduler:</b></dt>
4951+
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
4952+
* </dl>
4953+
*
4954+
* @param keySelector
4955+
* a function that extracts the key for each item
4956+
* @param elementSelector
4957+
* a function that extracts the return element for each item
4958+
* @param <K>
4959+
* the key type
4960+
* @param <R>
4961+
* the element type
4962+
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
4963+
* unique key value and each of which emits those items from the source Observable that share that
4964+
* key value
4965+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupBy</a>
4966+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
4967+
*/
4968+
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
4969+
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
4970+
}
4971+
49344972
/**
49354973
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
49364974
* grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
@@ -4962,7 +5000,7 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
49625000
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
49635001
*/
49645002
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
4965-
return lift(new OperatorGroupBy<K, T>(keySelector));
5003+
return lift(new OperatorGroupBy<T, K, T>(keySelector));
49665004
}
49675005

49685006
/**

rxjava-core/src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,26 +37,35 @@
3737
* @param <K> the key type
3838
* @param <T> the source and group value type
3939
*/
40-
public final class OperatorGroupBy<K, T> implements Operator<GroupedObservable<K, T>, T> {
40+
public final class OperatorGroupBy<T, K, R> implements Operator<GroupedObservable<K, R>, T> {
4141

4242
final Func1<? super T, ? extends K> keySelector;
43+
final Func1<? super T, ? extends R> elementSelector;
4344

45+
@SuppressWarnings("unchecked")
4446
public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {
47+
this(keySelector, (Func1<T, R>)IDENTITY);
48+
}
49+
50+
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends R> elementSelector) {
4551
this.keySelector = keySelector;
52+
this.elementSelector = elementSelector;
4653
}
4754

4855
@Override
49-
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, T>> child) {
50-
return new GroupBySubscriber<K, T>(keySelector, child);
56+
public Subscriber<? super T> call(final Subscriber<? super GroupedObservable<K, R>> child) {
57+
return new GroupBySubscriber<K, T, R>(keySelector, elementSelector, child);
5158
}
52-
static final class GroupBySubscriber<K, T> extends Subscriber<T> {
59+
static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
5360
final Func1<? super T, ? extends K> keySelector;
54-
final Subscriber<? super GroupedObservable<K, T>> child;
55-
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> child) {
61+
final Func1<? super T, ? extends R> elementSelector;
62+
final Subscriber<? super GroupedObservable<K, R>> child;
63+
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends R> elementSelector, Subscriber<? super GroupedObservable<K, R>> child) {
5664
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
5765
// and will unsubscribe on this parent if they are all unsubscribed
5866
super();
5967
this.keySelector = keySelector;
68+
this.elementSelector = elementSelector;
6069
this.child = child;
6170
}
6271
private final Map<K, BufferUntilSubscriber<T>> groups = new HashMap<K, BufferUntilSubscriber<T>>();
@@ -124,10 +133,10 @@ public void onNext(T t) {
124133
group = BufferUntilSubscriber.create();
125134
final BufferUntilSubscriber<T> _group = group;
126135

127-
GroupedObservable<K, T> go = new GroupedObservable<K, T>(key, new OnSubscribe<T>() {
136+
GroupedObservable<K, R> go = new GroupedObservable<K, R>(key, new OnSubscribe<R>() {
128137

129138
@Override
130-
public void call(final Subscriber<? super T> o) {
139+
public void call(final Subscriber<? super R> o) {
131140
// number of children we have running
132141
COUNTER_UPDATER.incrementAndGet(GroupBySubscriber.this);
133142
o.add(Subscriptions.create(new Action0() {
@@ -153,7 +162,7 @@ public void onError(Throwable e) {
153162

154163
@Override
155164
public void onNext(T t) {
156-
o.onNext(t);
165+
o.onNext(elementSelector.call(t));
157166
}
158167

159168
});
@@ -185,4 +194,13 @@ private void completeInner() {
185194
}
186195

187196
}
197+
198+
private final static Func1<Object, Object> IDENTITY = new Func1<Object, Object>() {
199+
200+
@Override
201+
public Object call(Object t) {
202+
return t;
203+
}
204+
205+
};
188206
}

rxjava-core/src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ public Integer call(String s) {
5959
return s.length();
6060
}
6161
};
62-
62+
6363
@Test
6464
public void testGroupBy() {
6565
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
66-
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
66+
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
6767

6868
Map<Integer, Collection<String>> map = toMap(grouped);
6969

@@ -72,11 +72,37 @@ public void testGroupBy() {
7272
assertArrayEquals(Arrays.asList("four", "five").toArray(), map.get(4).toArray());
7373
assertArrayEquals(Arrays.asList("three").toArray(), map.get(5).toArray());
7474
}
75+
76+
@Test
77+
public void testGroupByWithElementSelector() {
78+
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
79+
Observable<GroupedObservable<Integer, Integer>> grouped = source.lift(new OperatorGroupBy<String, Integer, Integer>(length, length));
80+
81+
Map<Integer, Collection<Integer>> map = toMap(grouped);
82+
83+
assertEquals(3, map.size());
84+
assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
85+
assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
86+
assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
87+
}
88+
89+
@Test
90+
public void testGroupByWithElementSelector2() {
91+
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
92+
Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);
93+
94+
Map<Integer, Collection<Integer>> map = toMap(grouped);
95+
96+
assertEquals(3, map.size());
97+
assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
98+
assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
99+
assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
100+
}
75101

76102
@Test
77103
public void testEmpty() {
78104
Observable<String> source = Observable.empty();
79-
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
105+
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
80106

81107
Map<Integer, Collection<String>> map = toMap(grouped);
82108

@@ -89,7 +115,7 @@ public void testError() {
89115
Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
90116
Observable<String> source = Observable.concat(sourceStrings, errorSource);
91117

92-
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<Integer, String>(length));
118+
Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
93119

94120
final AtomicInteger groupCounter = new AtomicInteger();
95121
final AtomicInteger eventCounter = new AtomicInteger();

0 commit comments

Comments
 (0)