Skip to content

Commit 54fcf1c

Browse files
Generate and print requests to validate backpressure.
1 parent 877152e commit 54fcf1c

File tree

1 file changed

+30
-3
lines changed
  • rxjava-2-examples/src/main/java/io/vertx/example/reactivex/web/backpressure

1 file changed

+30
-3
lines changed

rxjava-2-examples/src/main/java/io/vertx/example/reactivex/web/backpressure/Server.java

+30-3
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,35 @@
11
package io.vertx.example.reactivex.web.backpressure;
22

3-
import io.vertx.core.Future;
43
import io.vertx.example.util.Runner;
54
import io.vertx.reactivex.RxHelper;
65
import io.vertx.reactivex.core.AbstractVerticle;
6+
import io.vertx.reactivex.core.http.HttpClient;
7+
import io.vertx.reactivex.core.http.HttpClientResponse;
78
import io.vertx.reactivex.core.http.HttpServer;
89
import io.vertx.reactivex.core.http.HttpServerRequest;
910
import io.vertx.reactivex.ext.web.Router;
1011
import io.vertx.reactivex.ext.web.handler.BodyHandler;
12+
import java.util.Collections;
13+
import java.util.List;
14+
import java.util.concurrent.Callable;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.atomic.AtomicInteger;
1117

1218
/**
1319
* @author tomasz.michalak
1420
*/
1521
public class Server extends AbstractVerticle {
1622

23+
private static final int NUMBER_OF_REQUESTS = 1000;
24+
private static final int N_THREADS = 100;
25+
1726
// Convenience method so you can run it in your IDE
1827
public static void main(String[] args) throws InterruptedException {
1928
Runner.runExample(io.vertx.example.reactivex.web.backpressure.Server.class);
2029
}
2130

2231
@Override
23-
public void start(Future<Void> fut) throws Exception {
32+
public void start() throws Exception {
2433

2534
Router router = Router.router(vertx);
2635
router.route().handler(BodyHandler.create());
@@ -38,7 +47,25 @@ public void start(Future<Void> fut) throws Exception {
3847
router.accept(req);
3948
});
4049

41-
server.rxListen(8080).subscribe(res -> fut.complete(), onError -> fut.fail(onError.getCause()));
50+
server.rxListen(8080).subscribe(res -> generateRequests());
51+
}
52+
53+
private void generateRequests() throws InterruptedException {
54+
AtomicInteger ok = new AtomicInteger(0);
55+
AtomicInteger fail = new AtomicInteger(0);
56+
57+
List<Callable<HttpClient>> tasks = Collections.nCopies(NUMBER_OF_REQUESTS,
58+
() -> vertx.createHttpClient()
59+
.getNow(8080, "localhost", "/", resp ->
60+
log(resp, resp.statusCode() == 200 ? ok.incrementAndGet() : ok.get(),
61+
resp.statusCode() == 503 ? fail.incrementAndGet() : fail.get())
62+
));
63+
Executors.newFixedThreadPool(N_THREADS).invokeAll(tasks);
64+
65+
}
66+
67+
private void log(HttpClientResponse response, int ok, int fail) {
68+
System.out.println(response.statusCode() + ", ok=" + ok + ", fail=" + fail);
4269
}
4370

4471
}

0 commit comments

Comments
 (0)