Skip to content

Commit 6c55533

Browse files
committed
Full chapter 7
1 parent 4c7730a commit 6c55533

File tree

8 files changed

+616
-0
lines changed

8 files changed

+616
-0
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121

2222
testCompile 'ch.qos.logback:logback-classic:1.1.7'
2323
testCompile 'org.slf4j:slf4j-api:1.7.21'
24+
testCompile 'io.dropwizard:dropwizard-metrics:1.0.3'
2425

2526
testCompile 'org.springframework:spring-context:4.3.4.RELEASE'
2627
testCompile 'org.springframework:spring-jms:4.3.4.RELEASE'
@@ -38,6 +39,8 @@ dependencies {
3839
// testCompile 'com.squareup.retrofit2:converter-jackson:2.0.1'
3940

4041
testCompile 'junit:junit:4.12'
42+
testCompile 'org.assertj:assertj-core:3.5.2'
43+
testCompile 'org.mockito:mockito-core:2.2.15'
4144
}
4245

4346
task wrapper(type: Wrapper) {

src/test/java/com/oreilly/rxjava/ch7/Chapter7.java

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,17 @@
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
77
import rx.Observable;
8+
import rx.schedulers.TimeInterval;
89

910
import java.math.BigInteger;
11+
import java.time.Duration;
12+
import java.time.Instant;
13+
import java.time.LocalDate;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.TimeoutException;
16+
17+
import static java.time.Month.*;
18+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
1019

1120
@Ignore
1221
public class Chapter7 {
@@ -157,4 +166,132 @@ private Observable<Income> guessIncome(Person person) {
157166
return Observable.just(new Income(1));
158167
}
159168

169+
@Test
170+
public void sample_161() throws Exception {
171+
Observable<Person> person = Observable.just(new Person());
172+
173+
Observable<Income> income = person
174+
.flatMap(this::determineIncome)
175+
.flatMap(
176+
Observable::just,
177+
th -> Observable.empty(),
178+
Observable::empty)
179+
.concatWith(person.flatMap(this::guessIncome))
180+
.first();
181+
}
182+
183+
@Test
184+
public void sample_175() throws Exception {
185+
Observable<Person> person = Observable.just(new Person());
186+
187+
Observable<Income> income = person
188+
.flatMap(this::determineIncome)
189+
.flatMap(
190+
Observable::just,
191+
th -> person.flatMap(this::guessIncome),
192+
Observable::empty);
193+
}
194+
195+
@Test
196+
public void sample_187() throws Exception {
197+
Observable<Person> person = Observable.just(new Person());
198+
199+
Observable<Income> income = person
200+
.flatMap(this::determineIncome)
201+
.onErrorResumeNext(th -> {
202+
if (th instanceof NullPointerException) {
203+
return Observable.error(th);
204+
} else {
205+
return person.flatMap(this::guessIncome);
206+
}
207+
});
208+
}
209+
210+
Observable<Confirmation> confirmation() {
211+
Observable<Confirmation> delayBeforeCompletion =
212+
Observable
213+
.<Confirmation>empty()
214+
.delay(200, MILLISECONDS);
215+
return Observable
216+
.just(new Confirmation())
217+
.delay(100, MILLISECONDS)
218+
.concatWith(delayBeforeCompletion);
219+
}
220+
221+
@Test
222+
public void sample_215() throws Exception {
223+
confirmation()
224+
.timeout(210, MILLISECONDS)
225+
.forEach(
226+
System.out::println,
227+
th -> {
228+
if ((th instanceof TimeoutException)) {
229+
System.out.println("Too long");
230+
} else {
231+
th.printStackTrace();
232+
}
233+
}
234+
);
235+
}
236+
237+
Observable<LocalDate> nextSolarEclipse(LocalDate after) {
238+
return Observable
239+
.just(
240+
LocalDate.of(2016, MARCH, 9),
241+
LocalDate.of(2016, SEPTEMBER, 1),
242+
LocalDate.of(2017, FEBRUARY, 26),
243+
LocalDate.of(2017, AUGUST, 21),
244+
LocalDate.of(2018, FEBRUARY, 15),
245+
LocalDate.of(2018, JULY, 13),
246+
LocalDate.of(2018, AUGUST, 11),
247+
LocalDate.of(2019, JANUARY, 6),
248+
LocalDate.of(2019, JULY, 2),
249+
LocalDate.of(2019, DECEMBER, 26))
250+
.skipWhile(date -> !date.isAfter(after))
251+
.zipWith(
252+
Observable.interval(500, 50, MILLISECONDS),
253+
(date, x) -> date);
254+
}
255+
256+
@Test
257+
public void sample_253() throws Exception {
258+
nextSolarEclipse(LocalDate.of(2016, SEPTEMBER, 1))
259+
.timeout(
260+
() -> Observable.timer(1000, TimeUnit.MILLISECONDS),
261+
date -> Observable.timer(100, MILLISECONDS));
262+
}
263+
264+
@Test
265+
public void sample_262() throws Exception {
266+
Observable<TimeInterval<LocalDate>> intervals =
267+
nextSolarEclipse(LocalDate.of(2016, JANUARY, 1))
268+
.timeInterval();
269+
}
270+
271+
@Test
272+
public void sample_271() throws Exception {
273+
Observable<Instant> timestamps = Observable
274+
.fromCallable(() -> dbQuery())
275+
.doOnSubscribe(() -> log.info("subscribe()"))
276+
.doOnRequest(c -> log.info("Requested {}", c))
277+
.doOnNext(instant -> log.info("Got: {}", instant));
278+
279+
timestamps
280+
.zipWith(timestamps.skip(1), Duration::between)
281+
.map(Object::toString)
282+
.subscribe(log::info);
283+
}
284+
285+
private Instant dbQuery() {
286+
return Instant.now();
287+
}
288+
289+
@Test
290+
public void sample_291() throws Exception {
291+
Observable<String> obs = Observable
292+
.<String>error(new RuntimeException("Swallowed"))
293+
.doOnError(th -> log.warn("onError", th))
294+
.onErrorReturn(th -> "Fallback");
295+
}
296+
160297
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.oreilly.rxjava.ch7;
2+
3+
class Confirmation {
4+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.oreilly.rxjava.ch7;
2+
3+
import com.codahale.metrics.Counter;
4+
import com.codahale.metrics.MetricRegistry;
5+
import com.codahale.metrics.Slf4jReporter;
6+
import com.codahale.metrics.Timer;
7+
import org.junit.Ignore;
8+
import org.junit.Test;
9+
import org.slf4j.LoggerFactory;
10+
import rx.Observable;
11+
12+
import java.util.concurrent.TimeUnit;
13+
14+
@Ignore
15+
public class Monitoring {
16+
17+
private MetricRegistry metricRegistry;
18+
19+
@Test
20+
public void sample_9() throws Exception {
21+
metricRegistry = new MetricRegistry();
22+
Slf4jReporter reporter = Slf4jReporter
23+
.forRegistry(metricRegistry)
24+
.outputTo(LoggerFactory.getLogger(Monitoring.class))
25+
.build();
26+
reporter.start(1, TimeUnit.SECONDS);
27+
}
28+
29+
@Test
30+
public void sample_26() throws Exception {
31+
final Observable<Integer> observable = Observable.range(1, 100);
32+
final Counter items = metricRegistry.counter("items");
33+
observable
34+
.doOnNext(x -> items.inc())
35+
.subscribe(/* ... */);
36+
}
37+
38+
Observable<Long> makeNetworkCall(long x) {
39+
//...
40+
return Observable.just(10L);
41+
}
42+
43+
@Test
44+
public void sample_38() throws Exception {
45+
final Observable<Integer> observable = Observable.range(1, 100);
46+
47+
Counter counter = metricRegistry.counter("counter");
48+
observable
49+
.doOnNext(x -> counter.inc())
50+
.flatMap(this::makeNetworkCall)
51+
.doOnNext(x -> counter.dec())
52+
.subscribe(/* ... */);
53+
}
54+
55+
@Test
56+
public void sample_55() throws Exception {
57+
final Observable<Integer> observable = Observable.range(1, 100);
58+
Counter counter = metricRegistry.counter("counter");
59+
60+
observable
61+
.flatMap(x ->
62+
makeNetworkCall(x)
63+
.doOnSubscribe(counter::inc)
64+
.doOnTerminate(counter::dec)
65+
)
66+
.subscribe(/* ... */);
67+
}
68+
69+
@Test
70+
public void sample_69() throws Exception {
71+
Timer timer = metricRegistry.timer("timer");
72+
Timer.Context ctx = timer.time();
73+
//some lengthy operation...
74+
ctx.stop();
75+
}
76+
77+
@Test
78+
public void sample_78() throws Exception {
79+
Observable<Long> external = Observable.just(42L);
80+
81+
Timer timer = metricRegistry.timer("timer");
82+
83+
Observable<Long> externalWithTimer = Observable
84+
.defer(() -> Observable.just(timer.time()))
85+
.flatMap(timerCtx ->
86+
external.doOnCompleted(timerCtx::stop));
87+
}
88+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.oreilly.rxjava.ch7;
2+
3+
import rx.Observable;
4+
5+
import java.time.LocalDate;
6+
7+
interface MyService {
8+
Observable<LocalDate> externalCall();
9+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.oreilly.rxjava.ch7;
2+
3+
import rx.Observable;
4+
import rx.Scheduler;
5+
6+
import java.time.LocalDate;
7+
import java.util.concurrent.TimeUnit;
8+
9+
class MyServiceWithTimeout implements MyService {
10+
11+
private final MyService delegate;
12+
private final Scheduler scheduler;
13+
14+
MyServiceWithTimeout(MyService d, Scheduler s) {
15+
this.delegate = d;
16+
this.scheduler = s;
17+
}
18+
19+
@Override
20+
public Observable<LocalDate> externalCall() {
21+
return delegate
22+
.externalCall()
23+
.timeout(1, TimeUnit.SECONDS,
24+
Observable.empty(),
25+
scheduler);
26+
}
27+
}

0 commit comments

Comments
 (0)