|
1 | 1 | package io.vertx.example.reactivex.database.jdbc;
|
2 | 2 |
|
| 3 | +import io.reactivex.Single; |
| 4 | +import io.reactivex.exceptions.CompositeException; |
3 | 5 | import io.vertx.core.json.JsonArray;
|
4 | 6 | import io.vertx.core.json.JsonObject;
|
5 | 7 | import io.vertx.example.util.Runner;
|
@@ -34,19 +36,26 @@ public void start() throws Exception {
|
34 | 36 | .rxGetConnection()
|
35 | 37 | .flatMap(conn ->
|
36 | 38 | conn
|
37 |
| - // disable auto commit to manage transaction manually |
| 39 | + // Disable auto commit to handle transaction manually |
38 | 40 | .rxSetAutoCommit(false)
|
39 |
| - // switch from Completable to default Single value |
| 41 | + // Switch from Completable to default Single value |
40 | 42 | .toSingleDefault(false)
|
| 43 | + // Create table |
41 | 44 | .flatMap(autoCommit -> conn.rxExecute(sql).toSingleDefault(true))
|
| 45 | + // Insert colors |
42 | 46 | .flatMap(executed -> conn.rxUpdateWithParams("INSERT INTO colors (name) VALUES (?)", new JsonArray().add("BLACK")))
|
43 | 47 | .flatMap(updateResult -> conn.rxUpdateWithParams("INSERT INTO colors (name) VALUES (?)", new JsonArray().add("WHITE")))
|
44 | 48 | .flatMap(updateResult -> conn.rxUpdateWithParams("INSERT INTO colors (name) VALUES (?)", new JsonArray().add("PURPLE")))
|
45 |
| - .flatMap(updateResult -> conn.rxQuery("SELECT * FROM colors")) |
46 | 49 | // commit if all succeeded
|
47 |
| - .doOnSuccess(resultSet -> conn.rxCommit().subscribe()) |
48 |
| - // rollback if any failed |
49 |
| - .doOnError(throwable -> conn.rxRollback().subscribe()) |
| 50 | + .flatMap(updateResult -> conn.rxCommit().toSingleDefault(true).map(commit -> updateResult)) |
| 51 | + // Rollback if any failed with exception propagation |
| 52 | + .onErrorResumeNext(ex -> conn.rxRollback() |
| 53 | + .toSingleDefault(true) |
| 54 | + .onErrorResumeNext(ex2 -> Single.error(new CompositeException(ex, ex2))) |
| 55 | + .flatMap(ignore -> Single.error(ex)) |
| 56 | + ) |
| 57 | + // Get colors if all succeeded |
| 58 | + .flatMap(updateResult -> conn.rxQuery("SELECT * FROM colors")) |
50 | 59 | // close the connection regardless succeeded or failed
|
51 | 60 | .doAfterTerminate(conn::close)
|
52 | 61 | ).subscribe(resultSet -> {
|
|
0 commit comments