Skip to content

Commit 71dd09b

Browse files
authored
Merge pull request vert-x3#267 from tomaszmichalak/master
Vert.x-Web router with Flowable example (https://github.com/vert-x3/v…
2 parents 4105f47 + 54fcf1c commit 71dd09b

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

rxjava-2-examples/README.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ The http client json response is unmarshalled to a Java object: the `RxHelper.un
108108

109109
link:src/main/java/io/vertx/example/reactivex/http/client/unmarshalling/Client.java[RxJava 2 unmarshalling HTTP client]
110110

111+
=== Backpressure
112+
113+
The http server which uses Vert.x-Web router and defines a "drop" strategy when the server is overloaded (backpressure).
114+
115+
link:src/main/java/io/vertx/example/reactivex/web/backpressure/Server[RxJava 2 backpressure HTTP server]
116+
111117
== RxJava event bus examples
112118

113119
The event bus provides a natural fit with the Rx api.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.vertx.example.reactivex.web.backpressure;
2+
3+
import io.vertx.example.util.Runner;
4+
import io.vertx.reactivex.RxHelper;
5+
import io.vertx.reactivex.core.AbstractVerticle;
6+
import io.vertx.reactivex.core.http.HttpClient;
7+
import io.vertx.reactivex.core.http.HttpClientResponse;
8+
import io.vertx.reactivex.core.http.HttpServer;
9+
import io.vertx.reactivex.core.http.HttpServerRequest;
10+
import io.vertx.reactivex.ext.web.Router;
11+
import io.vertx.reactivex.ext.web.handler.BodyHandler;
12+
import java.util.Collections;
13+
import java.util.List;
14+
import java.util.concurrent.Callable;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
/**
19+
* @author tomasz.michalak
20+
*/
21+
public class Server extends AbstractVerticle {
22+
23+
private static final int NUMBER_OF_REQUESTS = 1000;
24+
private static final int N_THREADS = 100;
25+
26+
// Convenience method so you can run it in your IDE
27+
public static void main(String[] args) throws InterruptedException {
28+
Runner.runExample(io.vertx.example.reactivex.web.backpressure.Server.class);
29+
}
30+
31+
@Override
32+
public void start() throws Exception {
33+
34+
Router router = Router.router(vertx);
35+
router.route().handler(BodyHandler.create());
36+
router.route().handler(req -> req.response().putHeader("content-type", "text/html")
37+
.end("<html><body><h1>Hello from vert.x!</h1></body></html>"));
38+
39+
HttpServer server = vertx.createHttpServer();
40+
server.requestStream()
41+
.toFlowable()
42+
.map(HttpServerRequest::pause)
43+
.onBackpressureDrop(req -> req.response().setStatusCode(503).end())
44+
.observeOn(RxHelper.scheduler(vertx.getDelegate()))
45+
.subscribe(req -> {
46+
req.resume();
47+
router.accept(req);
48+
});
49+
50+
server.rxListen(8080).subscribe(res -> generateRequests());
51+
}
52+
53+
private void generateRequests() throws InterruptedException {
54+
AtomicInteger ok = new AtomicInteger(0);
55+
AtomicInteger fail = new AtomicInteger(0);
56+
57+
List<Callable<HttpClient>> tasks = Collections.nCopies(NUMBER_OF_REQUESTS,
58+
() -> vertx.createHttpClient()
59+
.getNow(8080, "localhost", "/", resp ->
60+
log(resp, resp.statusCode() == 200 ? ok.incrementAndGet() : ok.get(),
61+
resp.statusCode() == 503 ? fail.incrementAndGet() : fail.get())
62+
));
63+
Executors.newFixedThreadPool(N_THREADS).invokeAll(tasks);
64+
65+
}
66+
67+
private void log(HttpClientResponse response, int ok, int fail) {
68+
System.out.println(response.statusCode() + ", ok=" + ok + ", fail=" + fail);
69+
}
70+
71+
}

0 commit comments

Comments
 (0)