Skip to content

Commit 06ab6db

Browse files
committed
Merge branch 'max-lifetime' of github.com:kdubb/vertx-sql-client into kdubb-max-lifetime
2 parents 76afd3d + d839f24 commit 06ab6db

File tree

5 files changed

+152
-20
lines changed

5 files changed

+152
-20
lines changed

vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java

+50-6
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ public void testPipeliningDistribution(TestContext ctx) {
384384
public void testPoolIdleTimeout(TestContext ctx) {
385385
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
386386
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
387-
int pooleCleanerPeriod = 100;
387+
int poolCleanerPeriod = 100;
388388
int idleTimeout = 3000;
389389
Async latch = ctx.async();
390390
proxy.proxyHandler(conn -> {
@@ -393,8 +393,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
393393
conn.clientCloseHandler(v -> {
394394
long lifetime = System.currentTimeMillis() - now;
395395
int delta = 500;
396-
int lowerBound = idleTimeout - pooleCleanerPeriod - delta;
397-
int upperBound = idleTimeout + pooleCleanerPeriod + delta;
396+
int lowerBound = idleTimeout - poolCleanerPeriod - delta;
397+
int upperBound = idleTimeout + poolCleanerPeriod + delta;
398398
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
399399
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
400400
latch.complete();
@@ -408,7 +408,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
408408
listenLatch.awaitSuccess(20_000);
409409

410410
poolOptions
411-
.setPoolCleanerPeriod(pooleCleanerPeriod)
411+
.setPoolCleanerPeriod(poolCleanerPeriod)
412+
.setMaxLifetime(0)
412413
.setIdleTimeout(idleTimeout)
413414
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS);
414415
options.setPort(8080);
@@ -422,6 +423,49 @@ public void testPoolIdleTimeout(TestContext ctx) {
422423
.onComplete(ctx.asyncAssertSuccess());
423424
}
424425

426+
@Test
427+
public void testPoolMaxLifetime(TestContext ctx) {
428+
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
429+
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
430+
int poolCleanerPeriod = 100;
431+
int maxLifetime = 3000;
432+
Async latch = ctx.async();
433+
proxy.proxyHandler(conn -> {
434+
proxyConn.set(conn);
435+
long now = System.currentTimeMillis();
436+
conn.clientCloseHandler(v -> {
437+
long lifetime = System.currentTimeMillis() - now;
438+
int delta = 500;
439+
int lowerBound = maxLifetime - poolCleanerPeriod - delta;
440+
int upperBound = maxLifetime + poolCleanerPeriod + delta;
441+
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
442+
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
443+
latch.complete();
444+
});
445+
conn.connect();
446+
});
447+
448+
// Start proxy
449+
Async listenLatch = ctx.async();
450+
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
451+
listenLatch.awaitSuccess(20_000);
452+
453+
poolOptions
454+
.setPoolCleanerPeriod(poolCleanerPeriod)
455+
.setIdleTimeout(0)
456+
.setMaxLifetime(maxLifetime)
457+
.setMaxLifetimeUnit(TimeUnit.MILLISECONDS);
458+
options.setPort(8080);
459+
options.setHost("localhost");
460+
PgPool pool = createPool(options, poolOptions);
461+
462+
// Create a connection that remains in the pool
463+
pool
464+
.getConnection()
465+
.flatMap(SqlClient::close)
466+
.onComplete(ctx.asyncAssertSuccess());
467+
}
468+
425469
@Test
426470
public void testPoolConnectTimeout(TestContext ctx) {
427471
Async async = ctx.async(2);
@@ -463,9 +507,9 @@ public void testPoolConnectTimeout(TestContext ctx) {
463507
public void testNoConnectionLeaks(TestContext ctx) {
464508
Async killConnections = ctx.async();
465509
PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> {
466-
Collector<Row, ?, List<Integer>> collector = mapping(row -> row.getInteger(0), toList());
510+
Collector<Row, ?, List<Boolean>> collector = mapping(row -> row.getBoolean(0), toList());
467511
String sql = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = $1";
468-
PreparedQuery<SqlResult<List<Integer>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
512+
PreparedQuery<SqlResult<List<Boolean>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
469513
Tuple params = Tuple.of(options.getDatabase());
470514
preparedQuery.execute(params).compose(cf -> conn.close()).onComplete(ctx.asyncAssertSuccess(v -> killConnections.complete()));
471515
}));

vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java

+14
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
4545
obj.setIdleTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
4646
}
4747
break;
48+
case "maxLifetime":
49+
if (member.getValue() instanceof Number) {
50+
obj.setMaxLifetime(((Number)member.getValue()).intValue());
51+
}
52+
break;
53+
case "maxLifetimeUnit":
54+
if (member.getValue() instanceof String) {
55+
obj.setMaxLifetimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
56+
}
57+
break;
4858
case "maxSize":
4959
if (member.getValue() instanceof Number) {
5060
obj.setMaxSize(((Number)member.getValue()).intValue());
@@ -88,6 +98,10 @@ public static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
8898
if (obj.getIdleTimeoutUnit() != null) {
8999
json.put("idleTimeoutUnit", obj.getIdleTimeoutUnit().name());
90100
}
101+
json.put("maxLifetime", obj.getMaxLifetime());
102+
if (obj.getMaxLifetimeUnit() != null) {
103+
json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name());
104+
}
91105
json.put("maxSize", obj.getMaxSize());
92106
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
93107
if (obj.getName() != null) {

vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java

+52-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package io.vertx.sqlclient;
1919

2020
import io.vertx.codegen.annotations.DataObject;
21-
import io.vertx.core.http.HttpClientOptions;
2221
import io.vertx.core.impl.Arguments;
2322
import io.vertx.core.json.JsonObject;
2423

@@ -48,11 +47,21 @@ public class PoolOptions {
4847
*/
4948
public static final int DEFAULT_IDLE_TIMEOUT = 0;
5049

50+
/**
51+
* Default maximum pooled connection lifetime = 0 (no maximum)
52+
*/
53+
public static final int DEFAULT_MAXIMUM_LIFETIME = 0;
54+
5155
/**
5256
* Default connection idle time unit in the pool = seconds
5357
*/
5458
public static final TimeUnit DEFAULT_IDLE_TIMEOUT_TIME_UNIT = TimeUnit.SECONDS;
5559

60+
/**
61+
* Default maximum pooled connection lifetime unit = seconds
62+
*/
63+
public static final TimeUnit DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT = TimeUnit.SECONDS;
64+
5665
/**
5766
* Default pool cleaner period = 1000 ms (1 second)
5867
*/
@@ -87,6 +96,8 @@ public class PoolOptions {
8796
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
8897
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
8998
private TimeUnit idleTimeoutUnit = DEFAULT_IDLE_TIMEOUT_TIME_UNIT;
99+
private int maxLifetime = DEFAULT_MAXIMUM_LIFETIME;
100+
private TimeUnit maxLifetimeUnit = DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT;
90101
private int poolCleanerPeriod = DEFAULT_POOL_CLEANER_PERIOD;
91102
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
92103
private TimeUnit connectionTimeoutUnit = DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT;
@@ -177,16 +188,54 @@ public int getIdleTimeout() {
177188
}
178189

179190
/**
180-
* Establish an idle timeout for pooled connections.
191+
* Establish an idle timeout for pooled connections, a value of zero disables the idle timeout.
181192
*
182-
* @param idleTimeout the pool connection idle time unitq
193+
* @param idleTimeout the pool connection idle timeout
183194
* @return a reference to this, so the API can be used fluently
184195
*/
185196
public PoolOptions setIdleTimeout(int idleTimeout) {
197+
Arguments.require(idleTimeout >= 0, "idleTimeout must be >= 0");
186198
this.idleTimeout = idleTimeout;
187199
return this;
188200
}
189201

202+
/**
203+
* @return the pooled connection max lifetime unit
204+
*/
205+
public TimeUnit getMaxLifetimeUnit() {
206+
return maxLifetimeUnit;
207+
}
208+
209+
/**
210+
* Establish a max lifetime unit for pooled connections.
211+
*
212+
* @param maxLifetimeUnit pooled connection max lifetime unit
213+
* @return a reference to this, so the API can be used fluently
214+
*/
215+
public PoolOptions setMaxLifetimeUnit(TimeUnit maxLifetimeUnit) {
216+
this.maxLifetimeUnit = maxLifetimeUnit;
217+
return this;
218+
}
219+
220+
/**
221+
* @return pooled connection max lifetime
222+
*/
223+
public int getMaxLifetime() {
224+
return maxLifetime;
225+
}
226+
227+
/**
228+
* Establish a max lifetime for pooled connections, a value of zero disables the maximum lifetime.
229+
*
230+
* @param maxLifetime the pool connection max lifetime
231+
* @return a reference to this, so the API can be used fluently
232+
*/
233+
public PoolOptions setMaxLifetime(int maxLifetime) {
234+
Arguments.require(maxLifetime >= 0, "maxLifetime must be >= 0");
235+
this.maxLifetime = maxLifetime;
236+
return this;
237+
}
238+
190239
/**
191240
* @return the connection pool cleaner period in ms.
192241
*/

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
4343
private final CloseFuture closeFuture;
4444
private final long idleTimeout;
4545
private final long connectionTimeout;
46+
private final long maxLifetime;
4647
private final long cleanerPeriod;
4748
private final boolean pipelined;
4849
private volatile Handler<SqlConnectionPool.PooledConnection> connectionInitializer;
@@ -62,20 +63,21 @@ public PoolImpl(VertxInternal vertx,
6263

6364
this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit());
6465
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
66+
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
6567
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
6668
this.timerID = -1L;
6769
this.pipelined = pipelined;
6870
this.vertx = vertx;
69-
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
71+
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
7072
this.closeFuture = closeFuture;
7173
}
7274

7375
public Pool init() {
7476
closeFuture.add(this);
75-
if (idleTimeout > 0 && cleanerPeriod > 0) {
77+
if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) {
7678
synchronized (this) {
7779
timerID = vertx.setTimer(cleanerPeriod, id -> {
78-
checkExpired();
80+
runEviction();
7981
});
8082
}
8183
}
@@ -90,17 +92,17 @@ public Pool connectionProvider(Function<Context, Future<SqlConnection>> connecti
9092
return this;
9193
}
9294

93-
private void checkExpired() {
95+
private void runEviction() {
9496
synchronized (this) {
9597
if (timerID == -1) {
9698
// Cancelled
9799
return;
98100
}
99101
timerID = vertx.setTimer(cleanerPeriod, id -> {
100-
checkExpired();
102+
runEviction();
101103
});
102104
}
103-
pool.checkExpired();
105+
pool.evict();
104106
}
105107

106108
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

+28-5
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class SqlConnectionPool {
5353
private final Function<Connection, Future<Void>> beforeRecycle;
5454
private final boolean pipelined;
5555
private final long idleTimeout;
56+
private final long maxLifetime;
5657
private final int maxSize;
5758

5859
public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProvider,
@@ -61,6 +62,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
6162
Function<Connection, Future<Void>> beforeRecycle,
6263
VertxInternal vertx,
6364
long idleTimeout,
65+
long maxLifetime,
6466
int maxSize,
6567
boolean pipelined,
6668
int maxWaitQueueSize,
@@ -75,6 +77,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
7577
this.vertx = vertx;
7678
this.pipelined = pipelined;
7779
this.idleTimeout = idleTimeout;
80+
this.maxLifetime = maxLifetime;
7881
this.maxSize = maxSize;
7982
this.hook = hook;
8083
this.connectionProvider = connectionProvider;
@@ -142,9 +145,9 @@ public int size() {
142145
return pool.size();
143146
}
144147

145-
public void checkExpired() {
148+
public void evict() {
146149
long now = System.currentTimeMillis();
147-
pool.evict(conn -> conn.expirationTimestamp < now, ar -> {
150+
pool.evict(conn -> conn.shouldEvict(now), ar -> {
148151
if (ar.succeeded()) {
149152
List<PooledConnection> res = ar.result();
150153
for (PooledConnection conn : res) {
@@ -169,7 +172,7 @@ public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
169172
future = pooled.schedule(context, cmd);
170173
}
171174
return future.andThen(ar -> {
172-
pooled.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
175+
pooled.refresh();
173176
lease.recycle();
174177
});
175178
});
@@ -264,12 +267,15 @@ public class PooledConnection implements Connection, Connection.Holder {
264267
private Holder holder;
265268
private Promise<ConnectResult<PooledConnection>> poolCallback;
266269
private Lease<PooledConnection> lease;
267-
public long expirationTimestamp;
270+
public long idleEvictionTimestamp;
271+
public long lifetimeEvictionTimestamp;
268272

269273
PooledConnection(ConnectionFactory factory, Connection conn, PoolConnector.Listener listener) {
270274
this.factory = factory;
271275
this.conn = conn;
272276
this.listener = listener;
277+
this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
278+
refresh();
273279
}
274280

275281
@Override
@@ -346,6 +352,10 @@ private void close(Promise<Void> promise) {
346352
conn.close(this, promise);
347353
}
348354

355+
private void refresh() {
356+
this.idleEvictionTimestamp = idleTimeout > 0 ? System.currentTimeMillis() + idleTimeout : Long.MAX_VALUE;
357+
}
358+
349359
@Override
350360
public void init(Holder holder) {
351361
if (this.holder != null) {
@@ -389,7 +399,7 @@ private void doClose(Holder holder, Promise<Void> promise) {
389399
private void cleanup(Promise<Void> promise) {
390400
Lease<PooledConnection> l = this.lease;
391401
this.lease = null;
392-
this.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
402+
refresh();
393403
l.recycle();
394404
promise.complete();
395405
}
@@ -435,5 +445,18 @@ public int getSecretKey() {
435445
public Connection unwrap() {
436446
return conn;
437447
}
448+
449+
private boolean hasIdleExpired(long now) {
450+
return idleEvictionTimestamp < now;
451+
}
452+
453+
private boolean hasLifetimeExpired(long now) {
454+
return lifetimeEvictionTimestamp < now;
455+
}
456+
457+
private boolean shouldEvict(long now) {
458+
return hasIdleExpired(now) || hasLifetimeExpired(now);
459+
}
460+
438461
}
439462
}

0 commit comments

Comments
 (0)