Skip to content

Commit b66e33d

Browse files
authored
improve forceConnect pipeline (#1238)
1 parent e0037ab commit b66e33d

28 files changed

+293
-249
lines changed

benchmarks/src/jmh/java/software/amazon/jdbc/benchmarks/ConnectionPluginManagerBenchmarks.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ public Connection connectWithPlugins() throws SQLException {
196196
"driverProtocol",
197197
new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("host").build(),
198198
propertiesWithPlugins,
199-
true);
199+
true,
200+
null);
200201
}
201202

202203
@Benchmark
@@ -205,7 +206,8 @@ public Connection connectWithNoPlugins() throws SQLException {
205206
"driverProtocol",
206207
new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("host").build(),
207208
propertiesWithoutPlugins,
208-
true);
209+
true,
210+
null);
209211
}
210212

211213
@Benchmark

benchmarks/src/jmh/java/software/amazon/jdbc/benchmarks/PluginBenchmarks.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public static void main(String[] args) throws RunnerException {
115115
@Setup(Level.Iteration)
116116
public void setUpIteration() throws Exception {
117117
closeable = MockitoAnnotations.openMocks(this);
118-
when(mockConnectionPluginManager.connect(any(), any(), any(Properties.class), anyBoolean()))
118+
when(mockConnectionPluginManager.connect(any(), any(), any(Properties.class), anyBoolean(), any()))
119119
.thenReturn(mockConnection);
120120
when(mockConnectionPluginManager.execute(
121121
any(), any(), any(), eq("Connection.createStatement"), any(), any()))

examples/AWSDriverExample/src/main/java/software/amazon/OktaAuthPluginExample.java

-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.sql.Statement;
2424
import java.util.Properties;
2525
import software.amazon.jdbc.PropertyDefinition;
26-
import software.amazon.jdbc.plugin.federatedauth.FederatedAuthPlugin;
2726
import software.amazon.jdbc.plugin.federatedauth.OktaAuthPlugin;
2827

2928
public class OktaAuthPluginExample {

wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginManager.java

+38-18
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ public void init(
209209
protected <T, E extends Exception> T executeWithSubscribedPlugins(
210210
final String methodName,
211211
final PluginPipeline<T, E> pluginPipeline,
212-
final JdbcCallable<T, E> jdbcMethodFunc)
212+
final JdbcCallable<T, E> jdbcMethodFunc,
213+
final @Nullable ConnectionPlugin pluginToSkip)
213214
throws E {
214215

215216
if (pluginPipeline == null) {
@@ -232,7 +233,7 @@ protected <T, E extends Exception> T executeWithSubscribedPlugins(
232233
throw new RuntimeException("Error processing this JDBC call.");
233234
}
234235

235-
return pluginChainFunc.call(pluginPipeline, jdbcMethodFunc);
236+
return pluginChainFunc.call(pluginPipeline, jdbcMethodFunc, pluginToSkip);
236237
}
237238

238239

@@ -258,20 +259,28 @@ protected <T, E extends Exception> PluginChainJdbcCallable<T, E> makePluginChain
258259
final ConnectionPlugin plugin = this.plugins.get(i);
259260
final Set<String> pluginSubscribedMethods = plugin.getSubscribedMethods();
260261
final String pluginName = pluginNameByClass.getOrDefault(plugin.getClass(), plugin.getClass().getSimpleName());
261-
final boolean isSubscribed =
262-
pluginSubscribedMethods.contains(ALL_METHODS)
263-
|| pluginSubscribedMethods.contains(methodName);
262+
final boolean isSubscribed = pluginSubscribedMethods.contains(ALL_METHODS)
263+
|| pluginSubscribedMethods.contains(methodName);
264264

265265
if (isSubscribed) {
266266
if (pluginChainFunc == null) {
267-
pluginChainFunc = (pipelineFunc, jdbcFunc) ->
267+
// This case is for DefaultConnectionPlugin that always terminates the list of plugins.
268+
// Default plugin can't be skipped.
269+
pluginChainFunc = (pipelineFunc, jdbcFunc, pluginToSkip) ->
268270
executeWithTelemetry(() -> pipelineFunc.call(plugin, jdbcFunc), pluginName);
269271
} else {
270272
final PluginChainJdbcCallable<T, E> finalPluginChainFunc = pluginChainFunc;
271-
pluginChainFunc = (pipelineFunc, jdbcFunc) ->
272-
executeWithTelemetry(() -> pipelineFunc.call(
273-
plugin, () -> finalPluginChainFunc.call(pipelineFunc, jdbcFunc)),
273+
pluginChainFunc = (pipelineFunc, jdbcFunc, pluginToSkip) -> {
274+
if (pluginToSkip == plugin) {
275+
return finalPluginChainFunc.call(pipelineFunc, jdbcFunc, pluginToSkip);
276+
} else {
277+
return executeWithTelemetry(
278+
() -> pipelineFunc.call(
279+
plugin,
280+
() -> finalPluginChainFunc.call(pipelineFunc, jdbcFunc, pluginToSkip)),
274281
pluginName);
282+
}
283+
};
275284
}
276285
}
277286
}
@@ -338,7 +347,8 @@ public <T, E extends Exception> T execute(
338347
(plugin, func) ->
339348
plugin.execute(
340349
resultType, exceptionClass, methodInvokeOn, methodName, func, jdbcMethodArgs),
341-
jdbcMethodFunc);
350+
jdbcMethodFunc,
351+
null);
342352
}
343353

344354
/**
@@ -359,6 +369,7 @@ public <T, E extends Exception> T execute(
359369
* @param isInitialConnection a boolean indicating whether the current {@link Connection} is
360370
* establishing an initial physical connection to the database or has
361371
* already established a physical connection in the past
372+
* @param pluginToSkip the plugin that needs to be skipped while executing this pipeline
362373
* @return a {@link Connection} to the requested host
363374
* @throws SQLException if there was an error establishing a {@link Connection} to the requested
364375
* host
@@ -367,7 +378,8 @@ public Connection connect(
367378
final String driverProtocol,
368379
final HostSpec hostSpec,
369380
final Properties props,
370-
final boolean isInitialConnection)
381+
final boolean isInitialConnection,
382+
final @Nullable ConnectionPlugin pluginToSkip)
371383
throws SQLException {
372384

373385
TelemetryContext context = telemetryFactory.openTelemetryContext("connect", TelemetryTraceLevel.NESTED);
@@ -378,7 +390,8 @@ public Connection connect(
378390
plugin.connect(driverProtocol, hostSpec, props, isInitialConnection, func),
379391
() -> {
380392
throw new SQLException("Shouldn't be called.");
381-
});
393+
},
394+
pluginToSkip);
382395
} catch (final SQLException | RuntimeException e) {
383396
throw e;
384397
} catch (final Exception e) {
@@ -403,6 +416,7 @@ public Connection connect(
403416
* @param isInitialConnection a boolean indicating whether the current {@link Connection} is
404417
* establishing an initial physical connection to the database or has
405418
* already established a physical connection in the past
419+
* @param pluginToSkip the plugin that needs to be skipped while executing this pipeline
406420
* @return a {@link Connection} to the requested host
407421
* @throws SQLException if there was an error establishing a {@link Connection} to the requested
408422
* host
@@ -411,7 +425,8 @@ public Connection forceConnect(
411425
final String driverProtocol,
412426
final HostSpec hostSpec,
413427
final Properties props,
414-
final boolean isInitialConnection)
428+
final boolean isInitialConnection,
429+
final @Nullable ConnectionPlugin pluginToSkip)
415430
throws SQLException {
416431

417432
try {
@@ -421,7 +436,8 @@ public Connection forceConnect(
421436
plugin.forceConnect(driverProtocol, hostSpec, props, isInitialConnection, func),
422437
() -> {
423438
throw new SQLException("Shouldn't be called.");
424-
});
439+
},
440+
pluginToSkip);
425441
} catch (SQLException | RuntimeException e) {
426442
throw e;
427443
} catch (Exception e) {
@@ -535,7 +551,8 @@ public void initHostProvider(
535551
},
536552
() -> {
537553
throw new SQLException("Shouldn't be called.");
538-
});
554+
},
555+
null);
539556
} finally {
540557
context.closeContext();
541558
}
@@ -632,13 +649,16 @@ public ConnectionProvider getEffectiveConnProvider() {
632649
return this.effectiveConnProvider;
633650
}
634651

635-
private interface PluginPipeline<T, E extends Exception> {
652+
protected interface PluginPipeline<T, E extends Exception> {
636653

637654
T call(final @NonNull ConnectionPlugin plugin, final @Nullable JdbcCallable<T, E> jdbcMethodFunc) throws E;
638655
}
639656

640-
private interface PluginChainJdbcCallable<T, E extends Exception> {
657+
protected interface PluginChainJdbcCallable<T, E extends Exception> {
641658

642-
T call(final @NonNull PluginPipeline<T, E> pipelineFunc, final @NonNull JdbcCallable<T, E> jdbcMethodFunc) throws E;
659+
T call(
660+
final @NonNull PluginPipeline<T, E> pipelineFunc,
661+
final @NonNull JdbcCallable<T, E> jdbcMethodFunc,
662+
final @Nullable ConnectionPlugin pluginToSkip) throws E;
643663
}
644664
}

wrapper/src/main/java/software/amazon/jdbc/PluginService.java

+6
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ HostSpec getHostSpecByStrategy(List<HostSpec> hosts, HostRole role, String strat
178178
*/
179179
boolean forceRefreshHostList(final boolean shouldVerifyWriter, final long timeoutMs) throws SQLException;
180180

181+
Connection connect(HostSpec hostSpec, Properties props, final @Nullable ConnectionPlugin pluginToSkip)
182+
throws SQLException;
183+
181184
/**
182185
* Establishes a connection to the given host using the given properties. If a non-default
183186
* {@link ConnectionProvider} has been set with
@@ -215,6 +218,9 @@ HostSpec getHostSpecByStrategy(List<HostSpec> hosts, HostRole role, String strat
215218
*/
216219
Connection forceConnect(HostSpec hostSpec, Properties props) throws SQLException;
217220

221+
Connection forceConnect(
222+
HostSpec hostSpec, Properties props, final @Nullable ConnectionPlugin pluginToSkip) throws SQLException;
223+
218224
Dialect getDialect();
219225

220226
TargetDriverDialect getTargetDriverDialect();

wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -593,14 +593,35 @@ public void setHostListProvider(final HostListProvider hostListProvider) {
593593

594594
@Override
595595
public Connection connect(final HostSpec hostSpec, final Properties props) throws SQLException {
596+
return this.connect(hostSpec, props, null);
597+
}
598+
599+
@Override
600+
public Connection connect(
601+
final HostSpec hostSpec,
602+
final Properties props,
603+
final @Nullable ConnectionPlugin pluginToSkip)
604+
throws SQLException {
596605
return this.pluginManager.connect(
597-
this.driverProtocol, hostSpec, props, this.currentConnection == null);
606+
this.driverProtocol, hostSpec, props, this.currentConnection == null, pluginToSkip);
598607
}
599608

600609
@Override
601-
public Connection forceConnect(final HostSpec hostSpec, final Properties props) throws SQLException {
610+
public Connection forceConnect(
611+
final HostSpec hostSpec,
612+
final Properties props)
613+
throws SQLException {
614+
return this.forceConnect(hostSpec, props, null);
615+
}
616+
617+
@Override
618+
public Connection forceConnect(
619+
final HostSpec hostSpec,
620+
final Properties props,
621+
final @Nullable ConnectionPlugin pluginToSkip)
622+
throws SQLException {
602623
return this.pluginManager.forceConnect(
603-
this.driverProtocol, hostSpec, props, this.currentConnection == null);
624+
this.driverProtocol, hostSpec, props, this.currentConnection == null, pluginToSkip);
604625
}
605626

606627
private void updateHostAvailability(final List<HostSpec> hosts) {

wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraConnectionTrackerPlugin.java

-13
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl
5050
{
5151
addAll(SubscribedMethodHelper.NETWORK_BOUND_METHODS);
5252
add("connect");
53-
add("forceConnect");
5453
add("notifyNodeListChanged");
5554
}
5655
});
@@ -83,12 +82,6 @@ public Set<String> getSubscribedMethods() {
8382
@Override
8483
public Connection connect(final String driverProtocol, final HostSpec hostSpec, final Properties props,
8584
final boolean isInitialConnection, final JdbcCallable<Connection, SQLException> connectFunc) throws SQLException {
86-
return connectInternal(hostSpec, connectFunc);
87-
}
88-
89-
public Connection connectInternal(
90-
final HostSpec hostSpec, final JdbcCallable<Connection, SQLException> connectFunc)
91-
throws SQLException {
9285

9386
final Connection conn = connectFunc.call();
9487

@@ -104,12 +97,6 @@ public Connection connectInternal(
10497
return conn;
10598
}
10699

107-
@Override
108-
public Connection forceConnect(String driverProtocol, HostSpec hostSpec, Properties props,
109-
boolean isInitialConnection, JdbcCallable<Connection, SQLException> forceConnectFunc) throws SQLException {
110-
return connectInternal(hostSpec, forceConnectFunc);
111-
}
112-
113100
@Override
114101
public <T, E extends Exception> T execute(final Class<T> resultClass, final Class<E> exceptionClass,
115102
final Object methodInvokeOn, final String methodName, final JdbcCallable<T, E> jdbcMethodFunc,

wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java

+2-25
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
4646
{
4747
add("initHostProvider");
4848
add("connect");
49-
add("forceConnect");
5049
}
5150
});
5251

@@ -109,28 +108,6 @@ public Connection connect(
109108
final JdbcCallable<Connection, SQLException> connectFunc)
110109
throws SQLException {
111110

112-
return this.connectInternal(hostSpec, props, isInitialConnection, connectFunc);
113-
}
114-
115-
@Override
116-
public Connection forceConnect(
117-
final String driverProtocol,
118-
final HostSpec hostSpec,
119-
final Properties props,
120-
final boolean isInitialConnection,
121-
final JdbcCallable<Connection, SQLException> forceConnectFunc)
122-
throws SQLException {
123-
124-
return this.connectInternal(hostSpec, props, isInitialConnection, forceConnectFunc);
125-
}
126-
127-
private Connection connectInternal(
128-
final HostSpec hostSpec,
129-
final Properties props,
130-
final boolean isInitialConnection,
131-
final JdbcCallable<Connection, SQLException> connectFunc)
132-
throws SQLException {
133-
134111
final RdsUrlType type = this.rdsUtils.identifyRdsType(hostSpec.getHost());
135112

136113
if (!type.isRdsCluster()) {
@@ -203,7 +180,7 @@ private Connection getVerifiedWriterConnection(
203180
return writerCandidateConn;
204181
}
205182

206-
writerCandidateConn = this.pluginService.connect(writerCandidate, props);
183+
writerCandidateConn = this.pluginService.connect(writerCandidate, props, this);
207184

208185
if (this.pluginService.getHostRole(writerCandidateConn) != HostRole.WRITER) {
209186
// If the new connection resolves to a reader instance, this means the topology is outdated.
@@ -287,7 +264,7 @@ private Connection getVerifiedReaderConnection(
287264
return readerCandidateConn;
288265
}
289266

290-
readerCandidateConn = this.pluginService.connect(readerCandidate, props);
267+
readerCandidateConn = this.pluginService.connect(readerCandidate, props, this);
291268

292269
if (this.pluginService.getHostRole(readerCandidateConn) != HostRole.READER) {
293270
// If the new connection resolves to a writer instance, this means the topology is outdated.

wrapper/src/main/java/software/amazon/jdbc/plugin/ConnectTimeConnectionPlugin.java

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import software.amazon.jdbc.util.Messages;
3030

3131
public class ConnectTimeConnectionPlugin extends AbstractConnectionPlugin {
32+
3233
private static final Set<String> subscribedMethods =
3334
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("connect", "forceConnect")));
3435
private static long connectTime = 0L;

wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPlugin.java

-15
Original file line numberDiff line numberDiff line change
@@ -276,11 +276,7 @@ public Connection connect(
276276
final boolean isInitialConnection,
277277
final @NonNull JdbcCallable<Connection, SQLException> connectFunc)
278278
throws SQLException {
279-
return connectInternal(driverProtocol, hostSpec, connectFunc);
280-
}
281279

282-
private Connection connectInternal(String driverProtocol, HostSpec hostSpec,
283-
JdbcCallable<Connection, SQLException> connectFunc) throws SQLException {
284280
final Connection conn = connectFunc.call();
285281

286282
if (conn != null) {
@@ -294,17 +290,6 @@ private Connection connectInternal(String driverProtocol, HostSpec hostSpec,
294290
return conn;
295291
}
296292

297-
@Override
298-
public Connection forceConnect(
299-
final @NonNull String driverProtocol,
300-
final @NonNull HostSpec hostSpec,
301-
final @NonNull Properties props,
302-
final boolean isInitialConnection,
303-
final @NonNull JdbcCallable<Connection, SQLException> forceConnectFunc)
304-
throws SQLException {
305-
return connectInternal(driverProtocol, hostSpec, forceConnectFunc);
306-
}
307-
308293
public HostSpec getMonitoringHostSpec() {
309294
if (this.monitoringHostSpec == null) {
310295
this.monitoringHostSpec = this.pluginService.getCurrentHostSpec();

0 commit comments

Comments
 (0)