Skip to content

fix: add expiration time for cached reader connections #1422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The read/write splitting plugin is not loaded by default. To load the plugin, in

```
final Properties properties = new Properties();
properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover,efm");
properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover2,efm2");
```

If you would like to use the read/write splitting plugin without the failover plugin, make sure you have the `readWriteSplitting` plugin in the `wrapperPlugins` property, and that the failover plugin is not part of it.
Expand All @@ -30,7 +30,10 @@ The read/write splitting plugin is not currently supported for non-Aurora cluste
> [!WARNING]\
> If internal connection pools are enabled, database passwords may not be verified with every connection request. The initial connection request for each database instance in the cluster will verify the password, but subsequent requests may return a cached pool connection without re-verifying the password. This behavior is inherent to the nature of connection pools in general and not a bug with the driver. `ConnectionProviderManager.releaseResources` can be called to close all pools and remove all cached pool connections. See [InternalConnectionPoolPasswordWarning.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/InternalConnectionPoolPasswordWarning.java) for more details.

Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. Future calls to `setReadOnly `on the same `Connection` object will not require opening a new physical connection. However, calling `setReadOnly(true)` for the first time on a new `Connection` object will require the plugin to establish another new physical connection to a reader. If your application frequently calls `setReadOnly`, you can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the read/write plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects.
Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. By default, this cached connection will never expire, meaning all subsequent `setReadOnly(true)` calls on the same `Connection` object will keep reusing the same reader connection.
If your application frequently calls `setReadOnly`, this may have a performance impact. There are two ways to improve performance:
1. You can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the Read/Write Splitting plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects.
2. You can also use the [`cachedReaderKeepAliveTimeoutMs` connection parameter](#reader-keep-alive-timeout). This sets an expiration time on the reader connection. When `setReadOnly(true)` is called and the reader connection has expired, the plugin will create a new reader connection using the specified [reader selection strategy](#reader-selection).

> [!NOTE]\
> Initial connections to a cluster URL will not be pooled. The driver does not pool cluster URLs because it can be problematic to pool a URL that resolves to different instances over time. The main benefit of internal connection pools is when setReadOnly is called. When setReadOnly is called (regardless of the initial connection URL), an internal pool will be created for the writer/reader that the plugin switches to and connections for that instance can be reused in the future.
Expand Down Expand Up @@ -87,6 +90,16 @@ To indicate which selection strategy to use, the `readerHostSelectorStrategy` co
props.setProperty(ReadWriteSplittingPlugin.READER_HOST_SELECTOR_STRATEGY.name, "leastConnections");
```

## Reader keep-alive timeout
If no connection pool is used, reader connections created by calls to `setReadOnly(true)` will be cached for the entire lifetime of the `Connection` object. This may have a negative performance impact if your application makes frequent calls to `setReadOnly(true)`, as all read traffic is directed to a single reader instance.
To improve performance, you can specify a timeout for the cached reader connection using `cachedReaderKeepAliveTimeoutMs`. Once the reader has expired, the next call to `setReadOnly(true)` will create a new reader connection determined by the reader host selection strategy.

```java
final Properties properties = new Properties();
properties.setProperty("cachedReaderKeepAliveTimeoutMs", "600000");
```
> [!NOTE]\
> If a connection pool is used, this setting is ignored and the lifespan of this cached connection object will be handled by the connection pool instead.

## Limitations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.AwsWrapperProperty;
import software.amazon.jdbc.ConnectionProviderManager;
import software.amazon.jdbc.HostListProviderService;
import software.amazon.jdbc.HostRole;
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.JdbcCallable;
import software.amazon.jdbc.NodeChangeOptions;
import software.amazon.jdbc.OldConnectionSuggestedAction;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.PooledConnectionProvider;
import software.amazon.jdbc.PropertyDefinition;
import software.amazon.jdbc.cleanup.CanReleaseResources;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.plugin.failover.FailoverSQLException;
import software.amazon.jdbc.util.CacheItem;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.SqlState;
import software.amazon.jdbc.util.Utils;
Expand Down Expand Up @@ -68,17 +68,24 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
private volatile boolean inReadWriteSplit = false;
private HostListProviderService hostListProviderService;
private Connection writerConnection;
private Connection readerConnection;
private HostSpec readerHostSpec;
private boolean isReaderConnFromInternalPool;
private boolean isWriterConnFromInternalPool;
private CacheItem<Connection> readerConnection;

public static final AwsWrapperProperty READER_HOST_SELECTOR_STRATEGY =
new AwsWrapperProperty(
"readerHostSelectorStrategy",
"random",
"The strategy that should be used to select a new reader host.");

public static final AwsWrapperProperty CACHED_READER_KEEP_ALIVE_TIMEOUT =
new AwsWrapperProperty(
"cachedReaderKeepAliveTimeoutMs",
"0",
"The time in milliseconds to keep a reader connection alive in the cache. "
+ "Default value 0 means the Wrapper will keep reusing the same cached reader connection.");

static {
PropertyDefinition.registerPluginProperties(ReadWriteSplittingPlugin.class);
}
Expand All @@ -101,7 +108,7 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
this(pluginService, properties);
this.hostListProviderService = hostListProviderService;
this.writerConnection = writerConnection;
this.readerConnection = readerConnection;
this.readerConnection = new CacheItem<>(readerConnection, CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties));
}

@Override
Expand Down Expand Up @@ -196,8 +203,8 @@ public <T, E extends Exception> T execute(
if (this.writerConnection != null && !this.writerConnection.isClosed()) {
this.writerConnection.clearWarnings();
}
if (this.readerConnection != null && !this.readerConnection.isClosed()) {
this.readerConnection.clearWarnings();
if (isConnectionUsable(this.readerConnection)) {
this.readerConnection.get().clearWarnings();
}
} catch (final SQLException e) {
throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e);
Expand Down Expand Up @@ -267,9 +274,9 @@ private void setWriterConnection(final Connection writerConnection,
new Object[] {
writerHostSpec.getUrl()}));
}

private void setReaderConnection(final Connection conn, final HostSpec host) {
this.readerConnection = conn;
this.readerConnection = new CacheItem<>(conn, this.getKeepAliveTimeout(host));
this.readerHostSpec = host;
LOGGER.finest(
() -> Messages.get(
Expand Down Expand Up @@ -381,6 +388,12 @@ private void switchToWriterConnection(
new Object[] {writerHost.getUrl()}));
}

private void switchCurrentConnectionTo(
final CacheItem<Connection> cachedReaderConnection,
final HostSpec newConnectionHost) throws SQLException {
this.switchCurrentConnectionTo(cachedReaderConnection.get(), newConnectionHost);
}

private void switchCurrentConnectionTo(
final Connection newConnection,
final HostSpec newConnectionHost)
Expand Down Expand Up @@ -428,7 +441,10 @@ private void switchToReaderConnection(final List<HostSpec> hosts)
new Object[] {this.readerHostSpec.getUrl()}));
}

this.readerConnection.close();
Connection conn = this.readerConnection.get(true);
if (conn != null) {
conn.close();
}
this.readerConnection = null;
this.readerHostSpec = null;
initializeReaderConnection(hosts);
Expand Down Expand Up @@ -506,10 +522,22 @@ private void getNewReaderConnection() throws SQLException {
switchCurrentConnectionTo(this.readerConnection, this.readerHostSpec);
}

private boolean isConnectionUsable(final CacheItem<Connection> cachedConnection) throws SQLException {
return cachedConnection != null && !cachedConnection.isExpired() && isConnectionUsable(cachedConnection.get());
}

private boolean isConnectionUsable(final Connection connection) throws SQLException {
return connection != null && !connection.isClosed();
}

private long getKeepAliveTimeout(final HostSpec host) {
if (this.pluginService.isPooledConnectionProvider(host, properties)) {
// Let the connection pool handle the lifetime of the reader connection.
return 0;
}
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties));
}

@Override
public void releaseResources() {
closeIdleConnections();
Expand All @@ -521,18 +549,21 @@ private void closeIdleConnections() {
closeConnectionIfIdle(this.writerConnection);
}

void closeConnectionIfIdle(final CacheItem<Connection> cachedConnection) {
closeConnectionIfIdle(cachedConnection.get());
}

void closeConnectionIfIdle(final Connection internalConnection) {
final Connection currentConnection = this.pluginService.getCurrentConnection();
try {
if (internalConnection != null
&& internalConnection != currentConnection
&& !internalConnection.isClosed()) {
if (isConnectionUsable(internalConnection)
&& internalConnection != currentConnection) {
internalConnection.close();
if (internalConnection == writerConnection) {
writerConnection = null;
}

if (internalConnection == readerConnection) {
if (internalConnection == readerConnection.get()) {
readerConnection = null;
readerHostSpec = null;
}
Expand All @@ -550,6 +581,6 @@ Connection getWriterConnection() {
}

Connection getReaderConnection() {
return this.readerConnection;
return this.readerConnection.get();
}
}
68 changes: 68 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/util/CacheItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.util;

import java.util.Objects;

public class CacheItem<V> {

final V item;
final long expirationTime;

public CacheItem(final V item, final long expirationTime) {
this.item = item;
this.expirationTime = expirationTime;
}

public boolean isExpired() {
if (expirationTime <= 0) {
// No expiration time.
return false;
}
return System.nanoTime() > expirationTime;
}

public V get() {
return get(false);
}

public V get(final boolean returnExpired) {
return this.isExpired() && !returnExpired ? null : item;
}

@Override
public int hashCode() {
return Objects.hashCode(item);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof CacheItem)) {
return false;
}
CacheItem<?> other = (CacheItem<?>) obj;
return Objects.equals(this.item, other.item);
}

@Override
public String toString() {
return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]";
}
}
46 changes: 0 additions & 46 deletions wrapper/src/main/java/software/amazon/jdbc/util/CacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,50 +93,4 @@ protected void cleanUp() {
});
}
}

static class CacheItem<V> {
final V item;
final long expirationTime;

public CacheItem(final V item, final long expirationTime) {
this.item = item;
this.expirationTime = expirationTime;
}

boolean isExpired() {
return System.nanoTime() > expirationTime;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((item == null) ? 0 : item.hashCode());
return result;
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final CacheItem<?> other = (CacheItem<?>) obj;
if (item == null) {
return other.item == null;
} else {
return item.equals(other.item);
}
}

@Override
public String toString() {
return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import software.amazon.jdbc.HikariPooledConnectionProvider;
import software.amazon.jdbc.HostListProviderService;
import software.amazon.jdbc.HostRole;
import software.amazon.jdbc.HostSpec;
Expand All @@ -55,7 +54,6 @@
import software.amazon.jdbc.OldConnectionSuggestedAction;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.PropertyDefinition;
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy;
import software.amazon.jdbc.plugin.failover.FailoverSuccessSQLException;
import software.amazon.jdbc.util.SqlState;
Expand Down Expand Up @@ -96,7 +94,6 @@ public class ReadWriteSplittingPluginTest {
@Mock private JdbcCallable<Connection, SQLException> mockConnectFunc;
@Mock private JdbcCallable<ResultSet, SQLException> mockSqlFunction;
@Mock private PluginService mockPluginService;
@Mock private Dialect mockDialect;
@Mock private HostListProviderService mockHostListProviderService;
@Mock private Connection mockWriterConn;
@Mock private Connection mockNewWriterConn;
Expand Down Expand Up @@ -356,6 +353,35 @@ public void testSetReadOnlyOnClosedConnection() throws SQLException {
assertNull(plugin.getReaderConnection());
}

@Test
public void testSetReadOnly_readerExpires() throws SQLException, InterruptedException {
when(this.mockPluginService.connect(eq(readerHostSpec1), any(Properties.class), any()))
.thenReturn(mockReaderConn1)
.thenReturn(mockReaderConn2);

final Properties propsWithExpirationTime = new Properties();
propsWithExpirationTime.put("cachedReaderKeepAliveTimeoutMs", "5000");

final ReadWriteSplittingPlugin plugin = new ReadWriteSplittingPlugin(
mockPluginService,
propsWithExpirationTime);

plugin.switchConnectionIfRequired(true);
assertEquals(mockReaderConn1, plugin.getReaderConnection());

Thread.sleep(1000);

plugin.switchConnectionIfRequired(true);
// Ensure the cached reader connection hasn't changed yet since it hasn't expired.
assertEquals(mockReaderConn1, plugin.getReaderConnection());

Thread.sleep(6000);
plugin.switchConnectionIfRequired(true);

// Ensure the cached reader connection has expired and updated.
assertEquals(mockReaderConn2, plugin.getReaderConnection());
}

@Test
public void testExecute_failoverToNewWriter() throws SQLException {
when(mockSqlFunction.call()).thenThrow(FailoverSuccessSQLException.class);
Expand Down
Loading