Skip to content

Commit 10210e6

Browse files
committed
2.x: Cleanup after the new groupBy additions
1 parent d6345dc commit 10210e6

File tree

3 files changed

+26
-25
lines changed

3 files changed

+26
-25
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9717,25 +9717,25 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
97179717
* {@link Subscriber} during its lifetime and if this {@code Subscriber} cancels before the
97189718
* source terminates, the next emission by the source having the same key will trigger a new
97199719
* {@code GroupedPublisher} emission. The {@code evictingMapFactory} is used to create a map that will
9720-
* be used to hold the {@link GroupedFlowable}s by key. The evicting map created by this factory must
9720+
* be used to hold the {@link GroupedFlowable}s by key. The evicting map created by this factory must
97219721
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when an entry in this
97229722
* map has been evicted. The next source emission will bring about the completion of the evicted
97239723
* {@link GroupedFlowable}s and the arrival of an item with the same key as a completed {@link GroupedFlowable}
97249724
* will prompt the creation and emission of a new {@link GroupedFlowable} with that key.
97259725
*
97269726
* <p>A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and
9727-
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the
9727+
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the
97289728
* internal hash map containing the {@link GroupedFlowable}s.
97299729
*
97309730
* <p>The map created by an {@code evictingMapFactory} must be thread-safe.
97319731
*
97329732
* <p>An example of an {@code evictingMapFactory} using <a href="https://google.github.io/guava/releases/24.0-jre/api/docs/com/google/common/cache/CacheBuilder.html">CacheBuilder</a> from the Guava library is below:
97339733
*
9734-
* <pre>
9735-
* Function&lt;Consumer&lt;Object&gt;, Map&lt;Integer, Object&gt;&gt; evictingMapFactory =
9734+
* <pre><code>
9735+
* Function&lt;Consumer&lt;Object&gt;, Map&lt;Integer, Object&gt;&gt; evictingMapFactory =
97369736
* notify -&gt;
97379737
* CacheBuilder
9738-
* .newBuilder()
9738+
* .newBuilder()
97399739
* .maximumSize(3)
97409740
* .removalListener(entry -&gt; {
97419741
* try {
@@ -9749,14 +9749,14 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
97499749
* .asMap();
97509750
*
97519751
* // Emit 1000 items but ensure that the
9752-
* // internal map never has more than 3 items in it
9752+
* // internal map never has more than 3 items in it
97539753
* Flowable
97549754
* .range(1, 1000)
97559755
* // note that number of keys is 10
97569756
* .groupBy(x -&gt; x % 10, x -&gt; x, true, 16, evictingMapFactory)
97579757
* .flatMap(g -&gt; g)
97589758
* .forEach(System.out::println);
9759-
* </pre>
9759+
* </code></pre>
97609760
*
97619761
* <p>
97629762
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
@@ -9786,7 +9786,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
97869786
* @param bufferSize
97879787
* the hint for how many {@link GroupedFlowable}s and element in each {@link GroupedFlowable} should be buffered
97889788
* @param evictingMapFactory
9789-
* The factory used to create a map that will be used by the implementation to hold the
9789+
* The factory used to create a map that will be used by the implementation to hold the
97909790
* {@link GroupedFlowable}s. The evicting map created by this factory must
97919791
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when
97929792
* an entry in this map has been evicted. The next source emission will bring about the
@@ -9808,7 +9808,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
98089808
@Beta
98099809
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
98109810
Function<? super T, ? extends V> valueSelector,
9811-
boolean delayError, int bufferSize,
9811+
boolean delayError, int bufferSize,
98129812
Function<? super Consumer<Object>, ? extends Map<K, Object>> evictingMapFactory) {
98139813
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
98149814
ObjectHelper.requireNonNull(valueSelector, "valueSelector is null");
@@ -9817,7 +9817,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
98179817

98189818
return RxJavaPlugins.onAssembly(new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError, evictingMapFactory));
98199819
}
9820-
9820+
98219821
/**
98229822
* Returns a Flowable that correlates two Publishers when they overlap in time and groups the results.
98239823
* <p>

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class FlowableGroupBy<T, K, V> extends AbstractFlowableWithUpstream
4141
final boolean delayError;
4242
final Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory;
4343

44-
public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector,
44+
public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector,
4545
int bufferSize, boolean delayError, Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory) {
4646
super(source);
4747
this.keySelector = keySelector;
@@ -52,6 +52,7 @@ public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keyS
5252
}
5353

5454
@Override
55+
@SuppressWarnings({ "unchecked", "rawtypes" })
5556
protected void subscribeActual(Subscriber<? super GroupedFlowable<K, V>> s) {
5657

5758
final Map<Object, GroupedUnicast<K, V>> groups;
@@ -63,8 +64,8 @@ protected void subscribeActual(Subscriber<? super GroupedFlowable<K, V>> s) {
6364
groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
6465
} else {
6566
evictedGroups = new ConcurrentLinkedQueue<GroupedUnicast<K, V>>();
66-
Consumer<Object> evictionAction = (Consumer<Object>)(Consumer<?>) new EvictionAction<K, V>(evictedGroups);
67-
groups = (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.apply(evictionAction);
67+
Consumer<Object> evictionAction = (Consumer) new EvictionAction<K, V>(evictedGroups);
68+
groups = (Map) mapFactory.apply(evictionAction);
6869
}
6970
} catch (Exception e) {
7071
Exceptions.throwIfFatal(e);
@@ -176,7 +177,7 @@ public void onNext(T t) {
176177
}
177178

178179
group.onNext(v);
179-
180+
180181
if (evictedGroups != null) {
181182
GroupedUnicast<K, V> evictedGroup;
182183
while ((evictedGroup = evictedGroups.poll()) != null) {

src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,7 +1848,7 @@ public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) throws Exce
18481848
.test()
18491849
.assertResult(1);
18501850
}
1851-
1851+
18521852
@Test
18531853
public void mapFactoryThrows() {
18541854
final IOException ex = new IOException("boo");
@@ -1860,13 +1860,13 @@ public Map<Integer, Object> apply(final Consumer<Object> notify) throws Exceptio
18601860
throw ex;
18611861
}
18621862
};
1863-
Flowable.just(1)
1864-
.groupBy(Functions.<Integer>identity(), Functions.identity(), true, 16, evictingMapFactory)
1865-
.test()
1863+
Flowable.just(1)
1864+
.groupBy(Functions.<Integer>identity(), Functions.identity(), true, 16, evictingMapFactory)
1865+
.test()
18661866
.assertNoValues()
18671867
.assertError(ex);
18681868
}
1869-
1869+
18701870
@Test
18711871
public void mapFactoryExpiryCompletesGroupedFlowable() {
18721872
final List<Integer> completed = new CopyOnWriteArrayList<Integer>();
@@ -1887,15 +1887,15 @@ public void mapFactoryExpiryCompletesGroupedFlowable() {
18871887
ts.assertComplete();
18881888
ts.assertValueCount(3);
18891889
}
1890-
1890+
18911891
private static final Function<Integer, Integer> mod5 = new Function<Integer, Integer>() {
18921892

18931893
@Override
18941894
public Integer apply(Integer n) throws Exception {
18951895
return n % 5;
18961896
}
18971897
};
1898-
1898+
18991899
@Test
19001900
public void mapFactoryWithExpiringGuavaCacheDemonstrationCodeForUseInJavadoc() {
19011901
//javadoc will be a version of this using lambdas and without assertions
@@ -1911,9 +1911,9 @@ public void mapFactoryWithExpiringGuavaCacheDemonstrationCodeForUseInJavadoc() {
19111911
.assertComplete();
19121912
ts.assertValueCount(numValues);
19131913
//the exact eviction behaviour of the guava cache is not specified so we make some approximate tests
1914-
assertTrue(completed.size() > numValues *0.9);
1914+
assertTrue(completed.size() > numValues * 0.9);
19151915
}
1916-
1916+
19171917
@Test
19181918
public void mapFactoryEvictionQueueClearedOnErrorCoverageOnly() {
19191919
Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory = createEvictingMapFactorySynchronousOnly(1);
@@ -2045,7 +2045,7 @@ public Set<Entry<K, V>> entrySet() {
20452045

20462046
private static Function<Consumer<Object>, Map<Integer, Object>> createEvictingMapFactoryGuava(final int maxSize) {
20472047
Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory = //
2048-
new Function<Consumer<Object>, Map<Integer, Object>>(){
2048+
new Function<Consumer<Object>, Map<Integer, Object>>() {
20492049

20502050
@Override
20512051
public Map<Integer, Object> apply(final Consumer<Object> notify) throws Exception {
@@ -2068,7 +2068,7 @@ public void onRemoval(RemovalNotification<Integer,Object> notification) {
20682068

20692069
private static Function<Consumer<Object>, Map<Integer, Object>> createEvictingMapFactorySynchronousOnly(final int maxSize) {
20702070
Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory = //
2071-
new Function<Consumer<Object>, Map<Integer, Object>>(){
2071+
new Function<Consumer<Object>, Map<Integer, Object>>() {
20722072

20732073
@Override
20742074
public Map<Integer, Object> apply(final Consumer<Object> notify) throws Exception {

0 commit comments

Comments
 (0)