Skip to content

Commit d8edb18

Browse files
committed
fix: add expiration time for cached reader connections so they are not reused forever
1 parent 4136671 commit d8edb18

File tree

5 files changed

+156
-65
lines changed

5 files changed

+156
-65
lines changed

docs/using-the-jdbc-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ The read/write splitting plugin is not loaded by default. To load the plugin, in
88

99
```
1010
final Properties properties = new Properties();
11-
properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover,efm");
11+
properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover2,efm2");
1212
```
1313

1414
If you would like to use the read/write splitting plugin without the failover plugin, make sure you have the `readWriteSplitting` plugin in the `wrapperPlugins` property, and that the failover plugin is not part of it.
@@ -30,7 +30,10 @@ The read/write splitting plugin is not currently supported for non-Aurora cluste
3030
> [!WARNING]\
3131
> If internal connection pools are enabled, database passwords may not be verified with every connection request. The initial connection request for each database instance in the cluster will verify the password, but subsequent requests may return a cached pool connection without re-verifying the password. This behavior is inherent to the nature of connection pools in general and not a bug with the driver. `ConnectionProviderManager.releaseResources` can be called to close all pools and remove all cached pool connections. See [InternalConnectionPoolPasswordWarning.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/InternalConnectionPoolPasswordWarning.java) for more details.
3232
33-
Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. Future calls to `setReadOnly `on the same `Connection` object will not require opening a new physical connection. However, calling `setReadOnly(true)` for the first time on a new `Connection` object will require the plugin to establish another new physical connection to a reader. If your application frequently calls `setReadOnly`, you can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the read/write plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects.
33+
Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. By default, this cached connection will never expire, meaning all subsequent `setReadOnly(true)` calls on the same `Connection` object will keep reusing the same reader connection.
34+
If your application frequently calls `setReadOnly`, this may have a performance impact. There are two ways to improve performance:
35+
1. You can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the Read/Write Splitting plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects.
36+
2. You can also use the [`cachedReaderKeepAliveTimeoutMs` connection parameter](#reader-keep-alive-timeout). This sets an expiration time on the reader connection. When `setReadOnly(true)` is called and the reader connection has expired, the plugin will create a new reader connection using the specified [reader selection strategy](#reader-selection).
3437

3538
> [!NOTE]\
3639
> Initial connections to a cluster URL will not be pooled. The driver does not pool cluster URLs because it can be problematic to pool a URL that resolves to different instances over time. The main benefit of internal connection pools is when setReadOnly is called. When setReadOnly is called (regardless of the initial connection URL), an internal pool will be created for the writer/reader that the plugin switches to and connections for that instance can be reused in the future.
@@ -87,6 +90,16 @@ To indicate which selection strategy to use, the `readerHostSelectorStrategy` co
8790
props.setProperty(ReadWriteSplittingPlugin.READER_HOST_SELECTOR_STRATEGY.name, "leastConnections");
8891
```
8992

93+
## Reader keep-alive timeout
94+
If no connection pool is used, reader connections created by calls to `setReadOnly(true)` will be cached for the entire lifetime of the `Connection` object. This may have a negative performance impact if your application makes frequent calls to `setReadOnly(true)`, as all read traffic is directed to a single reader instance.
95+
To improve performance, you can specify a timeout for the cached reader connection using `cachedReaderKeepAliveTimeoutMs`. Once the reader has expired, the next call to `setReadOnly(true)` will create a new reader connection determined by the reader host selection strategy.
96+
97+
```java
98+
final Properties properties = new Properties();
99+
properties.setProperty("cachedReaderKeepAliveTimeoutMs", "600000");
100+
```
101+
> [!NOTE]\
102+
> If a connection pool is used, this setting is ignored and the lifespan of this cached connection object will be handled by the connection pool instead.
90103
91104
## Limitations
92105

wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,22 @@
2424
import java.util.List;
2525
import java.util.Properties;
2626
import java.util.Set;
27+
import java.util.concurrent.TimeUnit;
2728
import java.util.logging.Logger;
2829
import org.checkerframework.checker.nullness.qual.NonNull;
2930
import software.amazon.jdbc.AwsWrapperProperty;
30-
import software.amazon.jdbc.ConnectionProviderManager;
3131
import software.amazon.jdbc.HostListProviderService;
3232
import software.amazon.jdbc.HostRole;
3333
import software.amazon.jdbc.HostSpec;
3434
import software.amazon.jdbc.JdbcCallable;
3535
import software.amazon.jdbc.NodeChangeOptions;
3636
import software.amazon.jdbc.OldConnectionSuggestedAction;
3737
import software.amazon.jdbc.PluginService;
38-
import software.amazon.jdbc.PooledConnectionProvider;
3938
import software.amazon.jdbc.PropertyDefinition;
4039
import software.amazon.jdbc.cleanup.CanReleaseResources;
4140
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
4241
import software.amazon.jdbc.plugin.failover.FailoverSQLException;
42+
import software.amazon.jdbc.util.CacheItem;
4343
import software.amazon.jdbc.util.Messages;
4444
import software.amazon.jdbc.util.SqlState;
4545
import software.amazon.jdbc.util.Utils;
@@ -68,17 +68,24 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
6868
private volatile boolean inReadWriteSplit = false;
6969
private HostListProviderService hostListProviderService;
7070
private Connection writerConnection;
71-
private Connection readerConnection;
7271
private HostSpec readerHostSpec;
7372
private boolean isReaderConnFromInternalPool;
7473
private boolean isWriterConnFromInternalPool;
74+
private CacheItem<Connection> readerConnection;
7575

7676
public static final AwsWrapperProperty READER_HOST_SELECTOR_STRATEGY =
7777
new AwsWrapperProperty(
7878
"readerHostSelectorStrategy",
7979
"random",
8080
"The strategy that should be used to select a new reader host.");
8181

82+
public static final AwsWrapperProperty CACHED_READER_KEEP_ALIVE_TIMEOUT =
83+
new AwsWrapperProperty(
84+
"cachedReaderKeepAliveTimeoutMs",
85+
"0",
86+
"The time in milliseconds to keep a reader connection alive in the cache. "
87+
+ "Default value 0 means the Wrapper will keep reusing the same cached reader connection.");
88+
8289
static {
8390
PropertyDefinition.registerPluginProperties(ReadWriteSplittingPlugin.class);
8491
}
@@ -101,7 +108,7 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
101108
this(pluginService, properties);
102109
this.hostListProviderService = hostListProviderService;
103110
this.writerConnection = writerConnection;
104-
this.readerConnection = readerConnection;
111+
this.readerConnection = new CacheItem<>(readerConnection, CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties));
105112
}
106113

107114
@Override
@@ -196,8 +203,8 @@ public <T, E extends Exception> T execute(
196203
if (this.writerConnection != null && !this.writerConnection.isClosed()) {
197204
this.writerConnection.clearWarnings();
198205
}
199-
if (this.readerConnection != null && !this.readerConnection.isClosed()) {
200-
this.readerConnection.clearWarnings();
206+
if (isConnectionUsable(this.readerConnection)) {
207+
this.readerConnection.get().clearWarnings();
201208
}
202209
} catch (final SQLException e) {
203210
throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e);
@@ -267,9 +274,8 @@ private void setWriterConnection(final Connection writerConnection,
267274
new Object[] {
268275
writerHostSpec.getUrl()}));
269276
}
270-
271277
private void setReaderConnection(final Connection conn, final HostSpec host) {
272-
this.readerConnection = conn;
278+
this.readerConnection = new CacheItem<>(conn, this.getKeepAliveTimeout(host));
273279
this.readerHostSpec = host;
274280
LOGGER.finest(
275281
() -> Messages.get(
@@ -381,6 +387,12 @@ private void switchToWriterConnection(
381387
new Object[] {writerHost.getUrl()}));
382388
}
383389

390+
private void switchCurrentConnectionTo(
391+
final CacheItem<Connection> cachedReaderConnection,
392+
final HostSpec newConnectionHost) throws SQLException {
393+
this.switchCurrentConnectionTo(cachedReaderConnection.get(), newConnectionHost);
394+
}
395+
384396
private void switchCurrentConnectionTo(
385397
final Connection newConnection,
386398
final HostSpec newConnectionHost)
@@ -428,7 +440,10 @@ private void switchToReaderConnection(final List<HostSpec> hosts)
428440
new Object[] {this.readerHostSpec.getUrl()}));
429441
}
430442

431-
this.readerConnection.close();
443+
Connection conn = this.readerConnection.get(true);
444+
if (conn != null) {
445+
conn.close();
446+
}
432447
this.readerConnection = null;
433448
this.readerHostSpec = null;
434449
initializeReaderConnection(hosts);
@@ -506,10 +521,22 @@ private void getNewReaderConnection() throws SQLException {
506521
switchCurrentConnectionTo(this.readerConnection, this.readerHostSpec);
507522
}
508523

524+
private boolean isConnectionUsable(final CacheItem<Connection> cachedConnection) throws SQLException {
525+
return cachedConnection != null && !cachedConnection.isExpired() && isConnectionUsable(cachedConnection.get());
526+
}
527+
509528
private boolean isConnectionUsable(final Connection connection) throws SQLException {
510529
return connection != null && !connection.isClosed();
511530
}
512531

532+
private long getKeepAliveTimeout(final HostSpec host) {
533+
if (this.pluginService.isPooledConnectionProvider(host, properties)) {
534+
// Let the connection pool handle the lifetime of the reader connection.
535+
return 0;
536+
}
537+
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties));
538+
}
539+
513540
@Override
514541
public void releaseResources() {
515542
closeIdleConnections();
@@ -521,18 +548,21 @@ private void closeIdleConnections() {
521548
closeConnectionIfIdle(this.writerConnection);
522549
}
523550

551+
void closeConnectionIfIdle(final CacheItem<Connection> cachedConnection) {
552+
closeConnectionIfIdle(cachedConnection.get());
553+
}
554+
524555
void closeConnectionIfIdle(final Connection internalConnection) {
525556
final Connection currentConnection = this.pluginService.getCurrentConnection();
526557
try {
527-
if (internalConnection != null
528-
&& internalConnection != currentConnection
529-
&& !internalConnection.isClosed()) {
558+
if (isConnectionUsable(internalConnection)
559+
&& internalConnection != currentConnection) {
530560
internalConnection.close();
531561
if (internalConnection == writerConnection) {
532562
writerConnection = null;
533563
}
534564

535-
if (internalConnection == readerConnection) {
565+
if (internalConnection == readerConnection.get()) {
536566
readerConnection = null;
537567
readerHostSpec = null;
538568
}
@@ -550,6 +580,6 @@ Connection getWriterConnection() {
550580
}
551581

552582
Connection getReaderConnection() {
553-
return this.readerConnection;
583+
return this.readerConnection.get();
554584
}
555585
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package software.amazon.jdbc.util;
18+
19+
import java.util.Objects;
20+
21+
public class CacheItem<V> {
22+
23+
final V item;
24+
final long expirationTime;
25+
26+
public CacheItem(final V item, final long expirationTime) {
27+
this.item = item;
28+
this.expirationTime = expirationTime;
29+
}
30+
31+
public boolean isExpired() {
32+
if (expirationTime <= 0) {
33+
// No expiration time.
34+
return false;
35+
}
36+
return System.nanoTime() > expirationTime;
37+
}
38+
39+
public V get() {
40+
return get(false);
41+
}
42+
43+
public V get(final boolean returnExpired) {
44+
return this.isExpired() && !returnExpired ? null : item;
45+
}
46+
47+
@Override
48+
public int hashCode() {
49+
return Objects.hashCode(item);
50+
}
51+
52+
@Override
53+
public boolean equals(Object obj) {
54+
if (this == obj) {
55+
return true;
56+
}
57+
if (!(obj instanceof CacheItem)) {
58+
return false;
59+
}
60+
CacheItem<?> other = (CacheItem<?>) obj;
61+
return Objects.equals(this.item, other.item);
62+
}
63+
64+
@Override
65+
public String toString() {
66+
return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]";
67+
}
68+
}

wrapper/src/main/java/software/amazon/jdbc/util/CacheMap.java

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -93,50 +93,4 @@ protected void cleanUp() {
9393
});
9494
}
9595
}
96-
97-
static class CacheItem<V> {
98-
final V item;
99-
final long expirationTime;
100-
101-
public CacheItem(final V item, final long expirationTime) {
102-
this.item = item;
103-
this.expirationTime = expirationTime;
104-
}
105-
106-
boolean isExpired() {
107-
return System.nanoTime() > expirationTime;
108-
}
109-
110-
@Override
111-
public int hashCode() {
112-
final int prime = 31;
113-
int result = 1;
114-
result = prime * result + ((item == null) ? 0 : item.hashCode());
115-
return result;
116-
}
117-
118-
@Override
119-
public boolean equals(final Object obj) {
120-
if (this == obj) {
121-
return true;
122-
}
123-
if (obj == null) {
124-
return false;
125-
}
126-
if (getClass() != obj.getClass()) {
127-
return false;
128-
}
129-
final CacheItem<?> other = (CacheItem<?>) obj;
130-
if (item == null) {
131-
return other.item == null;
132-
} else {
133-
return item.equals(other.item);
134-
}
135-
}
136-
137-
@Override
138-
public String toString() {
139-
return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]";
140-
}
141-
}
14296
}

wrapper/src/test/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPluginTest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.junit.jupiter.api.Test;
4646
import org.mockito.Mock;
4747
import org.mockito.MockitoAnnotations;
48-
import software.amazon.jdbc.HikariPooledConnectionProvider;
4948
import software.amazon.jdbc.HostListProviderService;
5049
import software.amazon.jdbc.HostRole;
5150
import software.amazon.jdbc.HostSpec;
@@ -55,7 +54,6 @@
5554
import software.amazon.jdbc.OldConnectionSuggestedAction;
5655
import software.amazon.jdbc.PluginService;
5756
import software.amazon.jdbc.PropertyDefinition;
58-
import software.amazon.jdbc.dialect.Dialect;
5957
import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy;
6058
import software.amazon.jdbc.plugin.failover.FailoverSuccessSQLException;
6159
import software.amazon.jdbc.util.SqlState;
@@ -96,7 +94,6 @@ public class ReadWriteSplittingPluginTest {
9694
@Mock private JdbcCallable<Connection, SQLException> mockConnectFunc;
9795
@Mock private JdbcCallable<ResultSet, SQLException> mockSqlFunction;
9896
@Mock private PluginService mockPluginService;
99-
@Mock private Dialect mockDialect;
10097
@Mock private HostListProviderService mockHostListProviderService;
10198
@Mock private Connection mockWriterConn;
10299
@Mock private Connection mockNewWriterConn;
@@ -356,6 +353,35 @@ public void testSetReadOnlyOnClosedConnection() throws SQLException {
356353
assertNull(plugin.getReaderConnection());
357354
}
358355

356+
@Test
357+
public void testSetReadOnly_readerExpires() throws SQLException, InterruptedException {
358+
when(this.mockPluginService.connect(eq(readerHostSpec1), any(Properties.class), any()))
359+
.thenReturn(mockReaderConn1)
360+
.thenReturn(mockReaderConn2);
361+
362+
final Properties propsWithExpirationTime = new Properties();
363+
propsWithExpirationTime.put("cachedReaderKeepAliveTimeoutMs", "5000");
364+
365+
final ReadWriteSplittingPlugin plugin = new ReadWriteSplittingPlugin(
366+
mockPluginService,
367+
propsWithExpirationTime);
368+
369+
plugin.switchConnectionIfRequired(true);
370+
assertEquals(mockReaderConn1, plugin.getReaderConnection());
371+
372+
Thread.sleep(1000);
373+
374+
plugin.switchConnectionIfRequired(true);
375+
// Ensure the cached reader connection hasn't changed yet since it hasn't expired.
376+
assertEquals(mockReaderConn1, plugin.getReaderConnection());
377+
378+
Thread.sleep(6000);
379+
plugin.switchConnectionIfRequired(true);
380+
381+
// Ensure the cached reader connection has expired and updated.
382+
assertEquals(mockReaderConn2, plugin.getReaderConnection());
383+
}
384+
359385
@Test
360386
public void testExecute_failoverToNewWriter() throws SQLException {
361387
when(mockSqlFunction.call()).thenThrow(FailoverSuccessSQLException.class);

0 commit comments

Comments
 (0)