1
1
package io .vertx .example .reactivex .http .client .reduce ;
2
2
3
+ import io .reactivex .Flowable ;
3
4
import io .vertx .core .http .HttpMethod ;
4
5
import io .vertx .example .util .Runner ;
5
6
import io .vertx .reactivex .core .AbstractVerticle ;
6
7
import io .vertx .reactivex .core .buffer .Buffer ;
7
8
import io .vertx .reactivex .core .http .HttpClient ;
8
9
import io .vertx .reactivex .core .http .HttpClientRequest ;
9
- import io .reactivex .Observable ;
10
10
11
11
/*
12
12
* @author <a href="mailto:[email protected] ">Julien Viet</a>
@@ -22,23 +22,23 @@ public static void main(String[] args) {
22
22
public void start () throws Exception {
23
23
HttpClient client = vertx .createHttpClient ();
24
24
HttpClientRequest req = client .request (HttpMethod .GET , 8080 , "localhost" , "/" );
25
- req .toObservable ().
25
+ req .toFlowable ().
26
26
27
- // Status code check and -> Observable <Buffer>
27
+ // Status code check and -> Flowable <Buffer>
28
28
flatMap (resp -> {
29
- if (resp .statusCode () != 200 ) {
30
- throw new RuntimeException ("Wrong status code " + resp .statusCode ());
31
- }
32
- return Observable .just (Buffer .buffer ()).mergeWith (resp .toObservable ());
33
- }).
29
+ if (resp .statusCode () != 200 ) {
30
+ throw new RuntimeException ("Wrong status code " + resp .statusCode ());
31
+ }
32
+ return Flowable .just (Buffer .buffer ()).mergeWith (resp .toFlowable ());
33
+ }).
34
34
35
- // Reduce all buffers in a single buffer
35
+ // Reduce all buffers in a single buffer
36
36
reduce (Buffer ::appendBuffer ).
37
37
38
- // Turn in to a string
38
+ // Turn in to a string
39
39
map (buffer -> buffer .toString ("UTF-8" )).
40
40
41
- // Get a single buffer
41
+ // Get a single buffer
42
42
subscribe (data -> System .out .println ("Server content " + data ));
43
43
44
44
// End request
0 commit comments