Skip to content

Commit 877152e

Browse files
Vert.x-Web router with Flowable example (vert-x3/vertx-web#826).
1 parent c528552 commit 877152e

File tree

2 files changed

+50
-0
lines changed

2 files changed

+50
-0
lines changed

rxjava-2-examples/README.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ The http client json response is unmarshalled to a Java object: the `RxHelper.un
108108

109109
link:src/main/java/io/vertx/example/reactivex/http/client/unmarshalling/Client.java[RxJava 2 unmarshalling HTTP client]
110110

111+
=== Backpressure
112+
113+
The http server which uses Vert.x-Web router and defines a "drop" strategy when the server is overloaded (backpressure).
114+
115+
link:src/main/java/io/vertx/example/reactivex/web/backpressure/Server[RxJava 2 backpressure HTTP server]
116+
111117
== RxJava event bus examples
112118

113119
The event bus provides a natural fit with the Rx api.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.vertx.example.reactivex.web.backpressure;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.example.util.Runner;
5+
import io.vertx.reactivex.RxHelper;
6+
import io.vertx.reactivex.core.AbstractVerticle;
7+
import io.vertx.reactivex.core.http.HttpServer;
8+
import io.vertx.reactivex.core.http.HttpServerRequest;
9+
import io.vertx.reactivex.ext.web.Router;
10+
import io.vertx.reactivex.ext.web.handler.BodyHandler;
11+
12+
/**
13+
* @author tomasz.michalak
14+
*/
15+
public class Server extends AbstractVerticle {
16+
17+
// Convenience method so you can run it in your IDE
18+
public static void main(String[] args) throws InterruptedException {
19+
Runner.runExample(io.vertx.example.reactivex.web.backpressure.Server.class);
20+
}
21+
22+
@Override
23+
public void start(Future<Void> fut) throws Exception {
24+
25+
Router router = Router.router(vertx);
26+
router.route().handler(BodyHandler.create());
27+
router.route().handler(req -> req.response().putHeader("content-type", "text/html")
28+
.end("<html><body><h1>Hello from vert.x!</h1></body></html>"));
29+
30+
HttpServer server = vertx.createHttpServer();
31+
server.requestStream()
32+
.toFlowable()
33+
.map(HttpServerRequest::pause)
34+
.onBackpressureDrop(req -> req.response().setStatusCode(503).end())
35+
.observeOn(RxHelper.scheduler(vertx.getDelegate()))
36+
.subscribe(req -> {
37+
req.resume();
38+
router.accept(req);
39+
});
40+
41+
server.rxListen(8080).subscribe(res -> fut.complete(), onError -> fut.fail(onError.getCause()));
42+
}
43+
44+
}

0 commit comments

Comments
 (0)