Skip to content

Commit 58dc3cd

Browse files
Addressed PR comments: Added an option to enable the new behavior, while still ignoring timeout for legacy clients
1 parent 635cc01 commit 58dc3cd

File tree

4 files changed

+119
-2
lines changed

4 files changed

+119
-2
lines changed

PgTimeoutTester.java

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.vertx.sqlclient.templates.impl;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.pgclient.PgConnectOptions;
6+
import io.vertx.pgclient.PgPool;
7+
import io.vertx.sqlclient.PoolOptions;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
public class PgTimeoutTester {
13+
public static void main(String[] args) {
14+
Vertx vertx = Vertx.vertx();
15+
16+
PgConnectOptions dbConfig = new PgConnectOptions()
17+
.setPort(5432)
18+
.setConnectTimeout(2000)
19+
.setHost("localhost")
20+
.setDatabase("postgres")
21+
.setUser("postgres")
22+
.setPassword("postgres");
23+
24+
PoolOptions poolConfig = new PoolOptions()
25+
.setMaxSize(1) // One connection in Pool
26+
.setConnectionTimeout(2); // 2 seconds
27+
28+
PgPool pool = PgPool.pool(vertx, dbConfig, poolConfig);
29+
30+
//connectionTimeOut(pool, vertx);
31+
poolTimeOut(pool, vertx);
32+
}
33+
34+
private static void connectionTimeOut(PgPool pool, Vertx vertx) {
35+
//First query
36+
pool.getConnection()
37+
.onFailure(err -> {
38+
err.printStackTrace();
39+
vertx.close();
40+
})
41+
.compose(conn0 ->
42+
conn0.query("SELECT 1").execute()
43+
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
44+
/*.eventually(ign -> conn0.close())*/); // Don't close connection to trigger timeout while getting one below
45+
46+
//Second query
47+
pool.getConnection()
48+
.onFailure(err -> {
49+
err.printStackTrace();
50+
vertx.close();
51+
})
52+
.compose(conn0 ->
53+
conn0.query("SELECT 2").execute()
54+
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
55+
.eventually(ign -> conn0.close()));
56+
}
57+
58+
private static void poolTimeOut(PgPool pool, Vertx vertx) {
59+
//First query
60+
pool.getConnection()
61+
.onFailure(err -> {
62+
err.printStackTrace();
63+
vertx.close();
64+
})
65+
.compose(conn0 ->
66+
conn0.query("SELECT 1").execute()
67+
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
68+
.eventually(ign -> conn0.close()));// Don't close connection to trigger timeout while getting one below
69+
70+
List<Future<?>> futures = new ArrayList<>();
71+
//N queries
72+
for (int i = 2; i < 10; i++) {
73+
Future<?> f = pool.query("SELECT " + i).execute()
74+
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
75+
.onFailure(err -> {
76+
err.printStackTrace();
77+
vertx.close();
78+
});
79+
futures.add(f);
80+
}
81+
82+
Future.all(futures).onComplete(c -> vertx.close());
83+
}
84+
}

vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately
596596

597597
@Test
598598
public void testConnectionTimeoutWhenExecutingDirectly(TestContext ctx) {
599-
PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2));
599+
PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2).setAlwaysUseTimeout(true));
600600
final Async latch = ctx.async(2);
601601
pool.getConnection(ctx.asyncAssertSuccess(conn -> {
602602
conn

vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java

+28
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public class PoolOptions {
9494
*/
9595
public static final int DEFAULT_EVENT_LOOP_SIZE = 0;
9696

97+
/**
98+
* Default honor timeout when scheduling commands is false
99+
*/
100+
public static final boolean DEFAULT_ALWAYS_USE_TIMEOUT = false;
101+
97102
private int maxSize = DEFAULT_MAX_SIZE;
98103
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
99104
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
@@ -106,6 +111,7 @@ public class PoolOptions {
106111
private boolean shared = DEFAULT_SHARED_POOL;
107112
private String name = DEFAULT_NAME;
108113
private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE;
114+
private boolean alwaysUseTimeout = DEFAULT_ALWAYS_USE_TIMEOUT;
109115

110116
public PoolOptions() {
111117
}
@@ -122,6 +128,7 @@ public PoolOptions(PoolOptions other) {
122128
shared= other.shared;
123129
name = other.name;
124130
eventLoopSize = other.eventLoopSize;
131+
alwaysUseTimeout = other.alwaysUseTimeout;
125132
}
126133

127134
/**
@@ -360,6 +367,27 @@ public PoolOptions setEventLoopSize(int eventLoopSize) {
360367
return this;
361368
}
362369

370+
/**
371+
* @return Whether the pool will always use timeout, even when sending commands directly to execute.
372+
*/
373+
public boolean isAlwaysUseTimeout() { return alwaysUseTimeout; }
374+
375+
/**
376+
* Sets whether always honor the pool's timeout.
377+
* <p>
378+
* This basically affects the pool's schedule method, which will submit the command regardless of whether there's
379+
* an available connection or not. This settings allows the caller to have a consistent max wait time across every
380+
* method.
381+
* </p>
382+
* The default is {@code false}.
383+
* @param alwaysUseTimeout Whether to use the configured connection timeout when scheduling commands
384+
* @return a reference to this, so the API can be used fluently
385+
*/
386+
public PoolOptions setAlwaysUseTimeout(boolean alwaysUseTimeout) {
387+
this.alwaysUseTimeout = alwaysUseTimeout;
388+
return this;
389+
}
390+
363391
public JsonObject toJson() {
364392
JsonObject json = new JsonObject();
365393
PoolOptionsConverter.toJson(this, json);

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
4949
private volatile Handler<SqlConnectionPool.PooledConnection> connectionInitializer;
5050
private long timerID;
5151
private volatile Function<Context, Future<SqlConnection>> connectionProvider;
52+
private final boolean alwaysUseTimeout;
5253

5354
public static final String PROPAGATABLE_CONNECTION = "propagatable_connection";
5455

@@ -65,6 +66,7 @@ public PoolImpl(VertxInternal vertx,
6566
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
6667
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
6768
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
69+
this.alwaysUseTimeout = poolOptions.isAlwaysUseTimeout();
6870
this.timerID = -1L;
6971
this.pipelined = pipelined;
7072
this.vertx = vertx;
@@ -169,14 +171,17 @@ public Future<SqlConnection> getConnection() {
169171

170172
@Override
171173
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
174+
if (alwaysUseTimeout) {
175+
return pool.execute(context, cmd);
176+
}
172177
PromiseInternal<SqlConnectionPool.PooledConnection> promise = context.promise();
173178
//Acquires the connection honoring the pool's connection timeout
174179
acquire(context, connectionTimeout, promise);
175180
return promise.future().compose(pooled -> {
176181
//We need to 'init' the connection or close will fail.
177182
pooled.init(pooled);
178183
return pooled.schedule(context, cmd)
179-
.eventually(v -> {
184+
.eventually(() -> {
180185
Promise<Void> p = Promise.promise();
181186
pooled.close(pooled, p);
182187
return p.future();

0 commit comments

Comments
 (0)