Skip to content

Commit db21f88

Browse files
committed
Restore the RX Java 1 Example.
1 parent c040f7f commit db21f88

File tree

29 files changed

+245
-1
lines changed

29 files changed

+245
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.vertx.example.rxjava.database.jdbc;
2+
3+
import io.vertx.core.json.JsonObject;
4+
import io.vertx.example.util.Runner;
5+
import io.vertx.ext.sql.ResultSet;
6+
import io.vertx.rxjava.core.AbstractVerticle;
7+
import io.vertx.rxjava.ext.jdbc.JDBCClient;
8+
import rx.Single;
9+
10+
/*
11+
* @author <a href="mailto:[email protected]">Julien Viet</a>
12+
*/
13+
public class Client extends AbstractVerticle {
14+
15+
// Convenience method so you can run it in your IDE
16+
public static void main(String[] args) {
17+
Runner.runExample(Client.class);
18+
}
19+
20+
@Override
21+
public void start() throws Exception {
22+
23+
JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
24+
.put("driver_class", "org.hsqldb.jdbcDriver");
25+
26+
JDBCClient jdbc = JDBCClient.createShared(vertx, config);
27+
28+
// Connect to the database
29+
jdbc.rxGetConnection().flatMap(conn -> {
30+
31+
// Now chain some statements using flatmap composition
32+
Single<ResultSet> resa = conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))")
33+
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')"))
34+
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')"))
35+
.flatMap(result -> conn.rxQuery("SELECT * FROM test"));
36+
37+
return resa.doAfterTerminate(conn::close);
38+
39+
}).subscribe(resultSet -> {
40+
// Subscribe to the final result
41+
System.out.println("Results : " + resultSet.getRows());
42+
}, err -> {
43+
System.out.println("Database problem");
44+
err.printStackTrace();
45+
});
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.vertx.example.rxjava.database.jdbc;
2+
3+
import io.vertx.core.json.JsonObject;
4+
import io.vertx.example.util.Runner;
5+
import io.vertx.rxjava.core.AbstractVerticle;
6+
import io.vertx.rxjava.ext.jdbc.JDBCClient;
7+
8+
/*
9+
* @author <a href="mailto:[email protected]">Paulo Lopes</a>
10+
*/
11+
public class Streaming extends AbstractVerticle {
12+
13+
// Convenience method so you can run it in your IDE
14+
public static void main(String[] args) {
15+
Runner.runExample(Streaming.class);
16+
}
17+
18+
@Override
19+
public void start() throws Exception {
20+
21+
JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
22+
.put("driver_class", "org.hsqldb.jdbcDriver");
23+
24+
JDBCClient jdbc = JDBCClient.createShared(vertx, config);
25+
26+
jdbc
27+
.rxGetConnection() // Connect to the database
28+
.flatMapObservable(conn -> { // With the connection...
29+
return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
30+
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
31+
.flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
32+
.flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
33+
.flatMapObservable(sqlRowStream -> {
34+
return sqlRowStream.toObservable() // Transform the stream into an Observable...
35+
.doOnTerminate(conn::close); // ...and close the connection when the stream is fully read or an error occurs
36+
});
37+
}).subscribe(row -> System.out.println("Row : " + row.encode()));
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.vertx.example.rxjava.database.mongo;
2+
3+
import io.vertx.core.json.JsonObject;
4+
import io.vertx.example.util.Runner;
5+
import io.vertx.rxjava.core.AbstractVerticle;
6+
import io.vertx.rxjava.ext.mongo.MongoClient;
7+
import rx.Observable;
8+
9+
/*
10+
* @author <a href="mailto:[email protected]">Julien Viet</a>
11+
*/
12+
public class Client extends AbstractVerticle {
13+
14+
private MongoClient mongo;
15+
16+
// Convenience method so you can run it in your IDE
17+
public static void main(String[] args) {
18+
Runner.runExample(Client.class);
19+
}
20+
21+
@Override
22+
public void start() throws Exception {
23+
24+
JsonObject config = new JsonObject()
25+
.put("connection_string", "mongodb://localhost:27018")
26+
.put("db_name", "my_DB");
27+
28+
// Deploy an embedded mongo database so we can test against that
29+
vertx.deployVerticle("service:io.vertx.vertx-mongo-embedded-db", db -> {
30+
if (db.succeeded()) {
31+
32+
// Create the client
33+
mongo = MongoClient.createShared(vertx, config);
34+
35+
insertAndFind();
36+
37+
} else {
38+
System.out.println("Could not start mongo embedded");
39+
db.cause().printStackTrace();
40+
}
41+
});
42+
}
43+
44+
private void insertAndFind() {
45+
// Documents to insert
46+
Observable<JsonObject> documents = Observable.just(
47+
new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
48+
new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
49+
);
50+
51+
mongo.rxCreateCollection("users").flatMapObservable(v -> {
52+
// After collection is created we insert each document
53+
return documents.flatMap(doc -> mongo.rxInsert("users", doc).toObservable());
54+
}).doOnNext(id -> {
55+
System.out.println("Inserted document " + id);
56+
}).last().toSingle().flatMap(id -> {
57+
// Everything has been inserted now we can query mongo
58+
System.out.println("Insertions done");
59+
return mongo.rxFind("users", new JsonObject());
60+
}).subscribe(results -> {
61+
System.out.println("Results " + results);
62+
}, error -> {
63+
System.out.println("Err");
64+
error.printStackTrace();
65+
});
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.vertx.example.rxjava.eventbus.pingpong;
2+
3+
import io.vertx.rxjava.core.AbstractVerticle;
4+
import io.vertx.rxjava.core.eventbus.EventBus;
5+
import io.vertx.example.util.Runner;
6+
7+
public class PingPong extends AbstractVerticle {
8+
9+
private static final String ADDRESS = "ping-address";
10+
11+
// Convenience method so you can run it in your IDE
12+
public static void main(String[] args) {
13+
Runner.runClusteredExample(PingPong.class);
14+
}
15+
16+
@Override
17+
public void start() throws Exception {
18+
19+
EventBus eb = vertx.eventBus();
20+
21+
eb.consumer(ADDRESS)
22+
.toObservable()
23+
.subscribe(message -> {
24+
System.out.println("Received " + message.body());
25+
message.reply("PONG");
26+
});
27+
28+
// Send a message every second
29+
vertx.setPeriodic(1000, v -> {
30+
eb.rxSend(ADDRESS, "PING")
31+
.subscribe(reply -> {
32+
System.out.println("Received reply " + reply.body());
33+
});
34+
});
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.vertx.example.rxjava.eventbus.pubsub;
2+
3+
import io.vertx.example.util.Runner;
4+
import io.vertx.rxjava.core.AbstractVerticle;
5+
import io.vertx.rxjava.core.eventbus.EventBus;
6+
7+
/*
8+
* @author <a href="http://tfox.org">Tim Fox</a>
9+
*/
10+
public class Receiver extends AbstractVerticle {
11+
12+
// Convenience method so you can run it in your IDE
13+
public static void main(String[] args) {
14+
Runner.runClusteredExample(Receiver.class);
15+
}
16+
17+
18+
@Override
19+
public void start() throws Exception {
20+
21+
EventBus eb = vertx.eventBus();
22+
23+
eb.consumer("news-feed").
24+
toObservable().
25+
subscribe(message -> System.out.println("Received news: " + message.body()));
26+
27+
System.out.println("Ready!");
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.vertx.example.rxjava.eventbus.pubsub;
2+
3+
import io.vertx.core.AbstractVerticle;
4+
import io.vertx.core.eventbus.EventBus;
5+
import io.vertx.example.util.Runner;
6+
7+
/*
8+
* @author <a href="http://tfox.org">Tim Fox</a>
9+
*/
10+
public class Sender extends AbstractVerticle {
11+
12+
// Convenience method so you can run it in your IDE
13+
public static void main(String[] args) {
14+
Runner.runClusteredExample(Sender.class);
15+
}
16+
17+
@Override
18+
public void start() throws Exception {
19+
20+
EventBus eb = vertx.eventBus();
21+
22+
// Send a message every second
23+
24+
vertx.setPeriodic(1000, v -> eb.publish("news-feed", "Some news!"));
25+
}
26+
}

rx-examples/src/main/java/io/vertx/example/reactivex/scheduler/interval/Periodic.java renamed to rx-examples/src/main/java/io/vertx/example/rxjava/scheduler/timer/Periodic.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.vertx.example.reactivex.scheduler.timer;
1+
package io.vertx.example.rxjava.scheduler.timer;
22

33
import io.vertx.example.util.Runner;
44
import io.vertx.rxjava.core.AbstractVerticle;

0 commit comments

Comments
 (0)