Skip to content

Commit 4a50f17

Browse files
fix: LimitlessRouterMonitor to close connection properly (#1369)
1 parent f4fb2b5 commit 4a50f17

File tree

3 files changed

+70
-41
lines changed

3 files changed

+70
-41
lines changed

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java

Lines changed: 64 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.checkerframework.checker.nullness.qual.NonNull;
3030
import software.amazon.jdbc.HostSpec;
3131
import software.amazon.jdbc.PluginService;
32-
import software.amazon.jdbc.RoundRobinHostSelector;
3332
import software.amazon.jdbc.util.Messages;
3433
import software.amazon.jdbc.util.PropertyUtils;
3534
import software.amazon.jdbc.util.SlidingExpirationCacheWithCleanupThread;
@@ -104,6 +103,15 @@ public AtomicBoolean isStopped() {
104103
@Override
105104
public void close() throws Exception {
106105
this.stopped.set(true);
106+
try {
107+
if (this.monitoringConn != null && !this.monitoringConn.isClosed()) {
108+
this.monitoringConn.close();
109+
}
110+
} catch (final SQLException ex) {
111+
// ignore
112+
}
113+
114+
this.monitoringConn = null;
107115

108116
// Waiting for 5s gives a thread enough time to exit monitoring loop and close database connection.
109117
if (!this.threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -120,46 +128,65 @@ public void run() {
120128
"LimitlessRouterMonitor.running",
121129
new Object[] {this.hostSpec.getHost()}));
122130

123-
while (!this.stopped.get()) {
124-
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
125-
"limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL);
126-
telemetryContext.setAttribute("url", hostSpec.getUrl());
127-
try {
128-
this.openConnection();
129-
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
130-
continue;
131-
}
132-
List<HostSpec> newLimitlessRouters = queryHelper.queryForLimitlessRouters(this.monitoringConn,
133-
this.hostSpec.getPort());
134-
135-
limitlessRouterCache.put(
136-
this.limitlessRouterCacheKey,
137-
newLimitlessRouters,
138-
TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props)));
139-
140-
LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:"));
141-
TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry
142-
} catch (final InterruptedException exception) {
143-
LOGGER.finest(
144-
() -> Messages.get(
145-
"LimitlessRouterMonitor.interruptedExceptionDuringMonitoring",
146-
new Object[] {this.hostSpec.getHost()}));
147-
} catch (final Exception ex) {
148-
// this should not be reached; log and exit thread
149-
if (LOGGER.isLoggable(Level.FINEST)) {
150-
LOGGER.log(
151-
Level.FINEST,
152-
Messages.get(
153-
"LimitlessRouterMonitor.exceptionDuringMonitoringStop",
154-
new Object[] {this.hostSpec.getHost()}),
155-
ex); // We want to print full trace stack of the exception.
131+
try {
132+
while (!this.stopped.get()) {
133+
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
134+
"limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL);
135+
telemetryContext.setAttribute("url", hostSpec.getUrl());
136+
try {
137+
this.openConnection();
138+
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
139+
continue;
140+
}
141+
List<HostSpec> newLimitlessRouters = queryHelper.queryForLimitlessRouters(this.monitoringConn,
142+
this.hostSpec.getPort());
143+
144+
limitlessRouterCache.put(
145+
this.limitlessRouterCacheKey,
146+
newLimitlessRouters,
147+
TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props)));
148+
149+
LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:"));
150+
TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry
151+
} catch (final Exception ex) {
152+
if (telemetryContext != null) {
153+
telemetryContext.setException(ex);
154+
telemetryContext.setSuccess(false);
155+
}
156+
throw ex;
157+
} finally {
158+
if (telemetryContext != null) {
159+
telemetryContext.closeContext();
160+
}
156161
}
157-
} finally {
158-
if (telemetryContext != null) {
159-
telemetryContext.closeContext();
162+
}
163+
} catch (final InterruptedException exception) {
164+
LOGGER.finest(
165+
() -> Messages.get(
166+
"LimitlessRouterMonitor.interruptedExceptionDuringMonitoring",
167+
new Object[] {this.hostSpec.getHost()}));
168+
} catch (final Exception ex) {
169+
// this should not be reached; log and exit thread
170+
if (LOGGER.isLoggable(Level.FINEST)) {
171+
LOGGER.log(
172+
Level.FINEST,
173+
Messages.get(
174+
"LimitlessRouterMonitor.exceptionDuringMonitoringStop",
175+
new Object[] {this.hostSpec.getHost()}),
176+
ex); // We want to print full trace stack of the exception.
177+
}
178+
} finally {
179+
this.stopped.set(true);
180+
try {
181+
if (this.monitoringConn != null && !this.monitoringConn.isClosed()) {
182+
this.monitoringConn.close();
160183
}
184+
} catch (final SQLException ex) {
185+
// ignore
161186
}
187+
this.monitoringConn = null;
162188
}
189+
163190
}
164191

165192
private void openConnection() throws SQLException {

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public class LimitlessRouterServiceImpl implements LimitlessRouterService {
5757
try {
5858
limitlessRouterMonitor.close();
5959
} catch (Exception e) {
60-
// ignore
60+
LOGGER.warning(Messages.get("LimitlessRouterServiceImpl.errorClosingMonitor",
61+
new Object[]{e.getMessage()}));
6162
}
6263
},
6364
CACHE_CLEANUP_NANO

wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,19 +239,20 @@ LimitlessConnectionPlugin.unsupportedDialectOrDatabase=Unsupported dialect ''{0}
239239
LimitlessQueryHelper.unsupportedDialectOrDatabase=Unsupported dialect ''{0}'' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
240240

241241
# Limitless Router Monitor
242-
LimitlessRouterMonitor.exceptionDuringMonitoringStop=Unhandled exception was thrown in Limitless Router Monitoring thread for node {0}.
242+
LimitlessRouterMonitor.exceptionDuringMonitoringStop=Stopping monitoring after unhandled exception was thrown in Limitless Router Monitoring thread for node {0}.
243243
LimitlessRouterMonitor.interruptedExceptionDuringMonitoring=Limitless Router Monitoring thread for node {0} was interrupted.
244244
LimitlessRouterMonitor.invalidQuery=Limitless Connection Plugin has encountered an error obtaining Limitless Router endpoints. Please ensure that you are connecting to an Aurora Limitless Database Shard Group Endpoint URL.
245245
LimitlessRouterMonitor.invalidRouterLoad=Invalid load metric value of ''{1}''from the transaction router query aurora_limitless_router_endpoints() for transaction router ''{0}''. The load metric value must be a decimal value between 0 and 1. Host weight be assigned a default weight of 1.
246246
LimitlessRouterMonitor.getNetworkTimeoutError=An error occurred while getting the connection network timeout: {0}
247247
LimitlessRouterMonitor.openingConnection=Opening Limitless Router Monitor connection to ''{0}''.
248248
LimitlessRouterMonitor.openedConnection=Opened Limitless Router Monitor connection: {0}.
249249
LimitlessRouterMonitor.running=Limitless Router Monitor thread running on node {0}.
250-
LimitlessRouterMonitor.stopped=Limitless Router Monitor thread stopped on node {0].
250+
LimitlessRouterMonitor.stopped=Limitless Router Monitor thread stopped on node {0}.
251251

252252
# Limitless Router Service
253253
LimitlessRouterServiceImpl.connectWithHost=Connecting to host {0}.
254-
LimitlessRouterServiceImpl.errorStartingMonitor=An error occurred while starting Limitless Router Monitor. {0}
254+
LimitlessRouterServiceImpl.errorClosingMonitor=An error occurred while closing Limitless Router Monitor: {0}
255+
LimitlessRouterServiceImpl.errorStartingMonitor=An error occurred while starting Limitless Router Monitor: {0}
255256
LimitlessRouterServiceImpl.failedToConnectToHost=Failed to connect to host {0}.
256257
LimitlessRouterServiceImpl.fetchedEmptyRouterList=Empty router list was fetched.
257258
LimitlessRouterServiceImpl.getLimitlessRoutersException=Exception encountered getting Limitless Routers. {0}

0 commit comments

Comments
 (0)