Skip to content

Commit 5590727

Browse files
Version 1.0.0-rc.5
1 parent abe9f04 commit 5590727

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed

CHANGES.md

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,142 @@
11
# RxJava Releases #
22

3+
### Version 1.0.0-rc.5 – October 6th 2014 ([Maven Central](http://search.maven.org/#artifactdetails%7Cio.reactivex%7Crxjava%7C1.0.0-rc.5%7C)) ###
4+
5+
* [Pull 1729] (https://github.com/ReactiveX/RxJava/pull/1729) CombineLatest: Request Up When Dropping Values
6+
* [Pull 1728] (https://github.com/ReactiveX/RxJava/pull/1728) ObserveOn Error Propagation
7+
* [Pull 1727] (https://github.com/ReactiveX/RxJava/pull/1727) Proposed groupBy/groupByUntil Changes
8+
* [Pull 1726] (https://github.com/ReactiveX/RxJava/pull/1726) Fix Merge: backpressure + scalarValueQueue don't play nicely
9+
* [Pull 1720] (https://github.com/ReactiveX/RxJava/pull/1720) Change repeatWhen and retryWhen signatures.
10+
* [Pull 1719] (https://github.com/ReactiveX/RxJava/pull/1719) Fix Bug in the onBackpressure operators
11+
12+
### groupBy/groupByUntil
13+
14+
The `groupByUntil` operator was removed by collapsing its behavior into `groupBy`. Previously on `groupBy` when a child `GroupedObservable` was unsubscribed it would internally retain the state and ignore all future `onNext` for that key.
15+
16+
This matched behavior in Rx.Net but was found to be non-obvious and almost everyone using `groupBy` on long-lived streams actually wanted the behavior of `groupByUntil` where an unsubscribed `GroupedObservable` would clean up the resources and then if `onNext` for that key arrived again a new `GroupedObservable` would be emitted.
17+
18+
Adding backpressure (reactive pull) to `groupByUntil` was found to not work easily with its signatures so before 1.0 Final it was decided to collapse `groupBy` and `groupByUntil`. Further details on this can be found in [Pull Request 1727](https://github.com/ReactiveX/RxJava/pull/1727).
19+
20+
Here is an example of how `groupBy` now behaves when a child `GroupedObservable` is unsubscribed (using `take` here):
21+
22+
```java
23+
// odd/even into lists of 10
24+
Observable.range(1, 100)
25+
.groupBy(n -> n % 2 == 0)
26+
.flatMap(g -> {
27+
return g.take(10).toList();
28+
}).forEach(System.out::println);
29+
```
30+
31+
```
32+
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
33+
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
34+
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
35+
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
36+
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
37+
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
38+
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
39+
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
40+
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
41+
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]
42+
```
43+
44+
Previously this would have only emitted 2 groups and ignored all subsequent values:
45+
46+
```
47+
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
48+
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
49+
```
50+
51+
On a finite stream, similar behavior of the previous `groupBy` implementation that would filter can be achieved like this:
52+
53+
```java
54+
//odd/even into lists of 10
55+
Observable.range(1, 100)
56+
.groupBy(n -> n % 2 == 0)
57+
.flatMap(g -> {
58+
return g.filter(i -> i <= 20).toList();
59+
}).forEach(System.out::println);
60+
```
61+
62+
```
63+
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
64+
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
65+
```
66+
67+
That however does allow the stream to complete (which may not be wanted).
68+
69+
To unsubscribe here are some choices that get the same output but efficiently unsubscribe up so the source only emits 40 values:
70+
71+
```java
72+
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
73+
.groupBy(n -> n % 2 == 0)
74+
.flatMap(g -> {
75+
return g.take(10).toList();
76+
}).take(2).toBlocking().forEach(System.out::println);
77+
```
78+
79+
or
80+
81+
```java
82+
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
83+
.take(20)
84+
.groupBy(n -> n % 2 == 0)
85+
.flatMap(g -> {
86+
return g.toList();
87+
}).toBlocking().forEach(System.out::println);
88+
```
89+
90+
These show that now `groupBy` composes like any other operator without the nuanced and hidden behavior of ignoring values after a child `GroupedObservable` is unsubscribed.
91+
92+
Uses of `groupByUntil` can now all be done by just using operators like `take`, `takeWhile` and `takeUntil` on the `GroupedObservable` directly, such as this:
93+
94+
```java
95+
Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
96+
.groupBy(n -> n)
97+
.flatMap(g -> {
98+
return g.take(3).reduce((s, s2) -> s + s2);
99+
}).forEach(System.out::println);
100+
```
101+
```
102+
aaa
103+
bbb
104+
ccc
105+
aaa
106+
bbb
107+
ccc
108+
```
109+
110+
111+
112+
### retryWhen/repeatWhen
113+
114+
The `retryWhen` and `repeatWhen` method signatures both emitted a `Observable<Notification>` type which could be queried to represent either `onError` in the `retryWhen` case or `onCompleted` in the `repeatWhen` case. This was found to be confusing and unnecessary. The signatures were changed to emit `Observable<Throwable>` for `retryWhen` and `Observable<Void>` for `repeatWhen` to better signal the type of notification they are emitting without the need to then query the `Notification`.
115+
116+
The following contrived examples shows how the `Observable<Throwable>` is used to get the error that occurred when deciding to retry:
117+
118+
```java
119+
AtomicInteger count = new AtomicInteger();
120+
Observable.create((Subscriber<? super String> s) -> {
121+
if (count.getAndIncrement() == 0) {
122+
s.onError(new RuntimeException("always fails"));
123+
} else {
124+
s.onError(new IllegalArgumentException("user error"));
125+
}
126+
}).retryWhen(attempts -> {
127+
return attempts.flatMap(throwable -> {
128+
if (throwable instanceof IllegalArgumentException) {
129+
System.out.println("don't retry on IllegalArgumentException... allow failure");
130+
return Observable.error(throwable);
131+
} else {
132+
System.out.println(throwable + " => retry after 1 second");
133+
return Observable.timer(1, TimeUnit.SECONDS);
134+
}
135+
});
136+
})
137+
.toBlocking().forEach(System.out::println);
138+
```
139+
3140
### Version 1.0.0-rc.4 – October 2nd 2014 ([Maven Central](http://search.maven.org/#artifactdetails%7Cio.reactivex%7Crxjava%7C1.0.0-rc.4%7C)) ###
4141

5142
* [Pull 1687] (https://github.com/ReactiveX/RxJava/pull/1687) Don't allocate an empty ArrayList for each Observable.empty call

0 commit comments

Comments
 (0)