Skip to content

Commit b87ae8a

Browse files
authored
Merge pull request vert-x3#268 from tomaszmichalak/master
Small refactor, a source code url fix.
2 parents 71dd09b + 9566210 commit b87ae8a

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

rxjava-2-examples/README.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ link:src/main/java/io/vertx/example/reactivex/http/client/unmarshalling/Client.j
112112

113113
The http server which uses Vert.x-Web router and defines a "drop" strategy when the server is overloaded (backpressure).
114114

115-
link:src/main/java/io/vertx/example/reactivex/web/backpressure/Server[RxJava 2 backpressure HTTP server]
115+
link:src/main/java/io/vertx/example/reactivex/web/backpressure/Server.java[RxJava 2 backpressure HTTP server]
116116

117117
== RxJava event bus examples
118118

rxjava-2-examples/src/main/java/io/vertx/example/reactivex/web/backpressure/Server.java

+20-9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ public class Server extends AbstractVerticle {
2222

2323
private static final int NUMBER_OF_REQUESTS = 1000;
2424
private static final int N_THREADS = 100;
25+
private static final int PORT = 8080;
26+
27+
private static AtomicInteger allCounter = new AtomicInteger();
28+
private static AtomicInteger successCounter = new AtomicInteger();
29+
private static AtomicInteger failureCounter = new AtomicInteger();
2530

2631
// Convenience method so you can run it in your IDE
2732
public static void main(String[] args) throws InterruptedException {
@@ -47,25 +52,31 @@ public void start() throws Exception {
4752
router.accept(req);
4853
});
4954

50-
server.rxListen(8080).subscribe(res -> generateRequests());
55+
server.rxListen(PORT).subscribe(res -> generateRequests());
5156
}
5257

5358
private void generateRequests() throws InterruptedException {
54-
AtomicInteger ok = new AtomicInteger(0);
55-
AtomicInteger fail = new AtomicInteger(0);
56-
5759
List<Callable<HttpClient>> tasks = Collections.nCopies(NUMBER_OF_REQUESTS,
5860
() -> vertx.createHttpClient()
59-
.getNow(8080, "localhost", "/", resp ->
60-
log(resp, resp.statusCode() == 200 ? ok.incrementAndGet() : ok.get(),
61-
resp.statusCode() == 503 ? fail.incrementAndGet() : fail.get())
61+
.getNow(PORT, "localhost", "/", resp ->
62+
log(resp, successCounter(resp), failureCounter(resp))
6263
));
6364
Executors.newFixedThreadPool(N_THREADS).invokeAll(tasks);
65+
}
66+
67+
private int failureCounter(HttpClientResponse resp) {
68+
return resp.statusCode() == 503 ? failureCounter.incrementAndGet() : failureCounter.get();
69+
}
6470

71+
private int successCounter(HttpClientResponse resp) {
72+
return resp.statusCode() == 200 ? successCounter.incrementAndGet() : successCounter.get();
6573
}
6674

67-
private void log(HttpClientResponse response, int ok, int fail) {
68-
System.out.println(response.statusCode() + ", ok=" + ok + ", fail=" + fail);
75+
private void log(HttpClientResponse response, int success, int failure) {
76+
System.out.println(String
77+
.format(
78+
"Response status code: [%s] , number of all responses [%s], number of correct responses [%s], number of dropped responses [%s]",
79+
response.statusCode(), allCounter.incrementAndGet(), success, failure));
6980
}
7081

7182
}

0 commit comments

Comments
 (0)