Skip to content

FIX: connectionPool thread race conditions #179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 3.9
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public void handle(AsyncResult<Connection> connectionResult) {
createAndConnect(this) // Try to connect again using this handler
);
} else {
poolSize -= 1;
synchronized (this) {
poolSize -= 1;
}
notifyWaitersAboutAvailableConnection();
handler.handle(connectionResult);
}
Expand All @@ -111,7 +113,7 @@ private synchronized void createAndConnect(Handler<AsyncResult<Connection>> hand
.whenCompleteAsync((connection, error) -> {
try {
if (error != null) {
logger.info("failed to create connection", error);
logger.error("failed to create connection", error);
handler.handle(Future.failedFuture(error));
} else {
handler.handle(Future.succeededFuture(connection));
Expand All @@ -126,7 +128,7 @@ private synchronized void createAndConnect(Handler<AsyncResult<Connection>> hand
}
}, ConversionUtils.vertxToExecutor(vertx));
} catch (Throwable e) {
logger.info("creating a connection went wrong", e);
logger.error("creating a connection went wrong", e);
handler.handle(Future.failedFuture(e));
}
}
Expand All @@ -152,40 +154,25 @@ public synchronized void take(Handler<AsyncResult<Connection>> handler) {
if (connection.isConnected()) {
// Do connection test if connection test timeout is configured
if (connectionConfig != null && connectionConfig.getConnectionTestTimeout() > 0) {
AtomicBoolean testCompleted = new AtomicBoolean(false);
long timer = vertx.setTimer(connectionConfig.getConnectionTestTimeout(), ignored -> {
// check if the test request has completed or not, if not, try it again and drop the current connection
if (testCompleted.compareAndSet(false, true)) {
logger.info("connection test timeout");
connection.disconnect(); // drop the connection if it's still alive
synchronized (this) {
poolSize -= 1;
}

take(handler);
logger.error("connection test timeout");
connection.disconnect(); // drop the connection if it's still alive
synchronized (this) {
poolSize -= 1;
}
take(handler);
});
connection.sendQuery("SELECT 1 AS alive")
.whenCompleteAsync((ignored, error) -> {
if (error != null) {
logger.info("connection test failed", error);
connection.disconnect(); // try to close the connection
synchronized (this) {
poolSize -= 1;
}

take(handler);
} else {
// connection is good, however, need to check if the test query has timeout or not
// if timeout is not fired yet, then we will cleanup the timeout timer and return
// the connection, otherwise, we will skip this event, as timeout timer already
// drop the connection and retry
if (testCompleted.compareAndSet(false, true)) {
// cleanup the timer
if (this.connectionConfig.getConnectionTestTimeout() > 0) {
vertx.cancelTimer(timer);
if (vertx.cancelTimer(timer)) {
if (error != null) {
logger.error("connection test failed", error);
connection.disconnect(); // try to close the connection
synchronized (this) {
poolSize -= 1;
}

take(handler);
} else {
handler.handle(Future.succeededFuture(connection));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public void handle(AsyncResult<Connection> connectionResult) {
createAndConnect(this) // Try to connect again using this handler
);
} else {
poolSize -= 1;
synchronized (this) {
poolSize -= 1;
}
notifyWaitersAboutAvailableConnection();
handler.handle(connectionResult);
}
Expand All @@ -95,7 +97,7 @@ private synchronized void createAndConnect(Handler<AsyncResult<Connection>> hand
.connect()
.onComplete(ScalaUtils.toFunction1(handler), VertxEventLoopExecutionContext.create(vertx));
} catch (Throwable e) {
logger.info("creating a connection went wrong", e);
logger.error("creating a connection went wrong", e);
handler.handle(Future.failedFuture(e));
}
}
Expand Down