Skip to content

Commit 1707084

Browse files
committed
feat: limitless plugin implementation
1 parent 61dc665 commit 1707084

12 files changed

+837
-59
lines changed

aws_advanced_python_wrapper/connection_provider.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626

2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
2828
from aws_advanced_python_wrapper.host_selector import (
29-
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30-
WeightedRandomHostSelector)
29+
HighestWeightHostSelector, HostSelector, RandomHostSelector,
30+
RoundRobinHostSelector, WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.log import Logger
3333
from aws_advanced_python_wrapper.utils.messages import Messages
@@ -98,7 +98,8 @@ def connect(
9898
class DriverConnectionProvider(ConnectionProvider):
9999
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
100100
"round_robin": RoundRobinHostSelector(),
101-
"weighted_random": WeightedRandomHostSelector()}
101+
"weighted_random": WeightedRandomHostSelector(),
102+
"highest_weight": HighestWeightHostSelector()}
102103

103104
def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool:
104105
return True

aws_advanced_python_wrapper/host_selector.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,3 +260,15 @@ def _update_host_weight_map_from_string(self, props: Optional[Properties] = None
260260
except ValueError:
261261
logger.error(message, pair)
262262
raise AwsWrapperError(Messages.get_formatted(message, pair))
263+
264+
265+
class HighestWeightHostSelector(HostSelector):
266+
267+
def get_host(self, hosts: Tuple[HostInfo, ...], role: HostRole, props: Optional[Properties] = None) -> HostInfo:
268+
eligible_hosts: List[HostInfo] = [host for host in hosts if
269+
host.role == role and host.get_availability() == HostAvailability.AVAILABLE]
270+
271+
if len(eligible_hosts) == 0:
272+
raise AwsWrapperError(Messages.get_formatted("HostSelector.NoHostsMatchingRole", role))
273+
274+
return max(eligible_hosts, key=lambda host: host.weight)

aws_advanced_python_wrapper/limitless_connection_plugin.py renamed to aws_advanced_python_wrapper/limitless_plugin.py

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
SlidingExpirationCacheWithCleanupThread
3636
from aws_advanced_python_wrapper.utils.telemetry.telemetry import (
3737
TelemetryContext, TelemetryFactory, TelemetryTraceLevel)
38+
from aws_advanced_python_wrapper.utils.utils import LogUtils
3839

3940
if TYPE_CHECKING:
4041
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
@@ -44,12 +45,16 @@
4445
logger = Logger(__name__)
4546

4647

47-
class LimitlessConnectionPlugin(Plugin):
48+
class LimitlessPlugin(Plugin):
4849
_SUBSCRIBED_METHODS: Set[str] = {"connect"}
4950

5051
def __init__(self, plugin_service: PluginService, props: Properties):
5152
self._plugin_service = plugin_service
5253
self._properties = props
54+
self._limitless_router_service = LimitlessRouterService(
55+
self._plugin_service,
56+
LimitlessQueryHelper(self._plugin_service)
57+
)
5358

5459
@property
5560
def subscribed_methods(self) -> Set[str]:
@@ -68,42 +73,35 @@ def connect(
6873

6974
dialect: DatabaseDialect = self._plugin_service.database_dialect
7075
if not isinstance(dialect, AuroraLimitlessDialect):
71-
connection = connect_func()
7276
refreshed_dialect = self._plugin_service.database_dialect
73-
7477
if not isinstance(refreshed_dialect, AuroraLimitlessDialect):
7578
raise UnsupportedOperationError(
76-
Messages.get_formatted("LimitlessConnectionPlugin.UnsupportedDialectOrDatabase",
79+
Messages.get_formatted("LimitlessPlugin.UnsupportedDialectOrDatabase",
7780
type(refreshed_dialect).__name__))
7881

79-
limitless_router_service = LimitlessRouterService(
80-
self._plugin_service,
81-
LimitlessQueryHelper(self._plugin_service)
82-
)
83-
8482
if is_initial_connection:
85-
limitless_router_service.start_monitoring(host_info, props)
83+
self._limitless_router_service.start_monitoring(host_info, props)
8684

87-
context: LimitlessConnectionContext = LimitlessConnectionContext(
85+
self._context: LimitlessContext = LimitlessContext(
8886
host_info,
8987
props,
9088
connection,
9189
connect_func,
9290
[],
9391
self
9492
)
95-
limitless_router_service.establish_connection(context)
96-
connection = context.get_connection()
93+
self._limitless_router_service.establish_connection(self._context)
94+
connection = self._context.get_connection()
9795
if connection is not None and not self._plugin_service.driver_dialect.is_closed(connection):
98-
return context.get_connection()
96+
return connection
9997

100-
raise AwsWrapperError(Messages.get_formatted("LimitlessConnectionPlugin.FailedToConnectToHost", host_info.host))
98+
raise AwsWrapperError(Messages.get_formatted("LimitlessPlugin.FailedToConnectToHost", host_info.host))
10199

102100

103-
class LimitlessConnectionPluginFactory:
101+
class LimitlessPluginFactory:
104102

105103
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
106-
return LimitlessConnectionPlugin(plugin_service, props)
104+
return LimitlessPlugin(plugin_service, props)
107105

108106

109107
class LimitlessRouterMonitor:
@@ -172,6 +170,7 @@ def run(self):
172170
lambda _: new_limitless_routers,
173171
WrapperProperties.LIMITLESS_MONITOR_DISPOSAL_TIME_MS.get(
174172
self._properties) * 1_000_000)
173+
logger.debug(LogUtils.log_topology(tuple(new_limitless_routers), "[limitlessRouterMonitor] Topology:"))
175174

176175
sleep(self._interval_ms / 1000)
177176

@@ -257,15 +256,15 @@ def _create_host_info(self, result: Tuple[Any, Any], host_port_to_map: int) -> H
257256
return HostInfo(host_name, host_port_to_map, weight=weight, host_id=host_name)
258257

259258

260-
class LimitlessConnectionContext:
259+
class LimitlessContext:
261260

262261
def __init__(self,
263262
host_info: HostInfo,
264263
props: Properties,
265264
connection: Optional[Connection],
266265
connect_func: Callable,
267266
limitless_routers: List[HostInfo],
268-
connection_plugin: LimitlessConnectionPlugin) -> None:
267+
connection_plugin: LimitlessPlugin) -> None:
269268
self._host_info = host_info
270269
self._props = props
271270
self._connection = connection
@@ -326,23 +325,24 @@ def __init__(self, plugin_service: PluginService, query_helper: LimitlessQueryHe
326325
self._plugin_service = plugin_service
327326
self._query_helper = query_helper
328327

329-
def establish_connection(self, context: LimitlessConnectionContext) -> None:
328+
def establish_connection(self, context: LimitlessContext) -> None:
330329
context.set_limitless_routers(self._get_limitless_routers(
331330
self._plugin_service.host_list_provider.get_cluster_id(), context.get_props()))
332331

333332
if context.get_limitless_routers() is None or len(context.get_limitless_routers()) == 0:
334-
logger.debug("LimitlessRouterServiceImpl.limitlessRouterCacheEmpty")
333+
logger.debug("LimitlessRouterService.LimitlessRouterCacheEmpty")
335334

336335
wait_for_router_info = WrapperProperties.WAIT_FOR_ROUTER_INFO.get(context.get_props())
337336
if wait_for_router_info:
338337
self._synchronously_get_limitless_routers_with_retry(context)
339338
else:
340-
logger.debug("LimitlessRouterServiceImpl.UsingProvidedConnectUrl")
339+
logger.debug("LimitlessRouterService.UsingProvidedConnectUrl")
341340
if context.get_connection() is None or self._plugin_service.driver_dialect.is_closed(context.get_connection()):
342341
context.set_connection(context.get_connect_func()())
342+
return
343343

344-
if context.get_host_info in context.get_limitless_routers():
345-
logger.debug("LimitlessRouterServiceImpl.ConnectWithHost")
344+
if context.get_host_info() in context.get_limitless_routers():
345+
logger.debug(Messages.get_formatted("LimitlessRouterService.ConnectWithHost", context.get_host_info().host))
346346
if context.get_connection() is None:
347347
try:
348348
context.set_connection(context.get_connect_func()())
@@ -356,7 +356,7 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
356356
try:
357357
selected_host_info = self._plugin_service.get_host_info_by_strategy(
358358
HostRole.WRITER, "weighted_random", context.get_limitless_routers())
359-
logger.debug("LimitlessRouterServiceImpl.SelectedHost", "None" if selected_host_info is None else selected_host_info.host)
359+
logger.debug("LimitlessRouterService.SelectedHost", "None" if selected_host_info is None else selected_host_info.host)
360360
except Exception as e:
361361
if self._is_login_exception(e) or isinstance(e, UnsupportedOperationError):
362362
raise e
@@ -375,7 +375,7 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
375375
raise e
376376

377377
if selected_host_info is not None:
378-
logger.debug("LimitlessRouterServiceImpl.FailedToConnectToHost", selected_host_info.host)
378+
logger.debug("LimitlessRouterService.FailedToConnectToHost", selected_host_info.host)
379379
selected_host_info.set_availability(HostAvailability.UNAVAILABLE)
380380

381381
self._retry_connection_with_least_loaded_routers(context)
@@ -389,7 +389,7 @@ def _get_limitless_routers(self, cluster_id: str, props: Properties) -> List[Hos
389389
return []
390390
return routers
391391

392-
def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnectionContext) -> None:
392+
def _retry_connection_with_least_loaded_routers(self, context: LimitlessContext) -> None:
393393
retry_count = 0
394394
max_retries = WrapperProperties.MAX_RETRIES_MS.get_int(context.get_props())
395395
while retry_count < max_retries:
@@ -400,7 +400,7 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
400400
if (context.get_limitless_routers() is None
401401
or len(context.get_limitless_routers()) == 0
402402
or not context.is_any_router_available()):
403-
logger.debug("LimitlessRouterServiceImpl.NoRoutersAvailableForRetry")
403+
logger.debug("LimitlessRouterService.NoRoutersAvailableForRetry")
404404

405405
if context.get_connection() is not None and not self._plugin_service.driver_dialect.is_closed(context.get_connection()):
406406
return
@@ -417,14 +417,14 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
417417

418418
try:
419419
selected_host_info = self._plugin_service.get_host_info_by_strategy(
420-
HostRole.WRITER, "weighted_random", context.get_limitless_routers())
421-
logger.debug("LimitlessRouterServiceImpl.SelectedHostForRetry",
420+
HostRole.WRITER, "highest_weight", context.get_limitless_routers())
421+
logger.debug("LimitlessRouterService.SelectedHostForRetry",
422422
"None" if selected_host_info is None else selected_host_info.host)
423423
if selected_host_info is None:
424424
continue
425425

426426
except UnsupportedOperationError as e:
427-
logger.error("LimitlessRouterServiceImpl.IncorrectConfiguration")
427+
logger.error("LimitlessRouterService.IncorrectConfiguration")
428428
raise e
429429
except AwsWrapperError:
430430
continue
@@ -438,14 +438,14 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
438438
if self._is_login_exception(e):
439439
raise e
440440
selected_host_info.set_availability(HostAvailability.UNAVAILABLE)
441-
logger.debug("LimitlessRouterServiceImpl.FailedToConnectToHost", selected_host_info.host)
441+
logger.debug("LimitlessRouterService.FailedToConnectToHost", selected_host_info.host)
442442

443443
raise AwsWrapperError(Messages.get("LimitlessRouterService.MaxRetriesExceeded"))
444444

445-
def _synchronously_get_limitless_routers_with_retry(self, context: LimitlessConnectionContext) -> None:
446-
logger.debug("LimitlessRouterServiceImpl.SynchronouslyGetLimitlessRouters")
445+
def _synchronously_get_limitless_routers_with_retry(self, context: LimitlessContext) -> None:
446+
logger.debug("LimitlessRouterService.SynchronouslyGetLimitlessRouters")
447447
retry_count = -1
448-
max_retries = WrapperProperties.MAX_RETRIES_MS.get_int(context.get_props())
448+
max_retries = WrapperProperties.GET_ROUTER_MAX_RETRIES.get_int(context.get_props())
449449
retry_interval_ms = WrapperProperties.GET_ROUTER_RETRY_INTERVAL_MS.get_float(context.get_props())
450450
first_iteration = True
451451
while first_iteration or retry_count < max_retries:
@@ -467,7 +467,7 @@ def _synchronously_get_limitless_routers_with_retry(self, context: LimitlessConn
467467

468468
raise AwsWrapperError(Messages.get("LimitlessRouterService.NoRoutersAvailable"))
469469

470-
def _synchronously_get_limitless_routers(self, context: LimitlessConnectionContext) -> None:
470+
def _synchronously_get_limitless_routers(self, context: LimitlessContext) -> None:
471471
cache_expiration_nano: int = WrapperProperties.LIMITLESS_MONITOR_DISPOSAL_TIME_MS.get_int(context.get_props()) * 1_000_000
472472

473473
lock = LimitlessRouterService._force_get_limitless_routers_lock_map.compute_if_absent(

aws_advanced_python_wrapper/plugin_service.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
FastestResponseStrategyPluginFactory
2525
from aws_advanced_python_wrapper.federated_plugin import \
2626
FederatedAuthPluginFactory
27-
from aws_advanced_python_wrapper.limitless_connection_plugin import \
28-
LimitlessConnectionPluginFactory
27+
from aws_advanced_python_wrapper.limitless_plugin import LimitlessPluginFactory
2928
from aws_advanced_python_wrapper.okta_plugin import OktaAuthPluginFactory
3029
from aws_advanced_python_wrapper.states.session_state_service import (
3130
SessionStateService, SessionStateServiceImpl)
@@ -731,7 +730,7 @@ class PluginManager(CanReleaseResources):
731730
"federated_auth": FederatedAuthPluginFactory,
732731
"okta": OktaAuthPluginFactory,
733732
"initial_connection": AuroraInitialConnectionStrategyPluginFactory,
734-
"limitless": LimitlessConnectionPluginFactory,
733+
"limitless": LimitlessPluginFactory,
735734
}
736735

737736
WEIGHT_RELATIVE_TO_PRIOR_PLUGIN = -1
@@ -751,7 +750,7 @@ class PluginManager(CanReleaseResources):
751750
IamAuthPluginFactory: 700,
752751
AwsSecretsManagerPluginFactory: 800,
753752
FederatedAuthPluginFactory: 900,
754-
LimitlessConnectionPluginFactory: 950,
753+
LimitlessPluginFactory: 950,
755754
OktaAuthPluginFactory: 1000,
756755
ConnectTimePluginFactory: WEIGHT_RELATIVE_TO_PRIOR_PLUGIN,
757756
ExecuteTimePluginFactory: WEIGHT_RELATIVE_TO_PRIOR_PLUGIN,

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ IamAuthPlugin.InvalidHost=[IamAuthPlugin] Invalid IAM host {}. The IAM host must
153153
IamAuthPlugin.IsNoneOrEmpty=[IamAuthPlugin] Property "{}" is None or empty.
154154
IamAuthUtils.GeneratedNewAuthToken=Generated new authentication token = {}
155155

156-
LimitlessConnectionPlugin.FailedToConnectToHost=[LimitlessConnectionPlugin] Failed to connect to host {}.
157-
LimitlessConnectionPlugin.UnsupportedDialectOrDatabase=[LimitlessConnectionPlugin] Unsupported dialect '{}' encountered. Please ensure the connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
156+
LimitlessPlugin.FailedToConnectToHost=[LimitlessPlugin] Failed to connect to host {}.
157+
LimitlessPlugin.UnsupportedDialectOrDatabase=[LimitlessPlugin] Unsupported dialect '{}' encountered. Please ensure the connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
158158

159159
LimitlessQueryHelper.UnsupportedDialectOrDatabase=[LimitlessQueryHelper] Unsupported dialect '{}' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
160160

aws_advanced_python_wrapper/sql_alchemy_connection_provider.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
from aws_advanced_python_wrapper.connection_provider import ConnectionProvider
2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
2828
from aws_advanced_python_wrapper.host_selector import (
29-
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30-
WeightedRandomHostSelector)
29+
HighestWeightHostSelector, HostSelector, RandomHostSelector,
30+
RoundRobinHostSelector, WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.messages import Messages
3333
from aws_advanced_python_wrapper.utils.properties import (Properties,
@@ -48,7 +48,8 @@ class SqlAlchemyPooledConnectionProvider(ConnectionProvider, CanReleaseResources
4848
_LEAST_CONNECTIONS: ClassVar[str] = "least_connections"
4949
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
5050
"round_robin": RoundRobinHostSelector(),
51-
"weighted_random": WeightedRandomHostSelector()}
51+
"weighted_random": WeightedRandomHostSelector(),
52+
"highest_weight": HighestWeightHostSelector()}
5253
_rds_utils: ClassVar[RdsUtils] = RdsUtils()
5354
_database_pools: ClassVar[SlidingExpirationCache[PoolKey, QueuePool]] = SlidingExpirationCache(
5455
should_dispose_func=lambda queue_pool: queue_pool.checkedout() == 0,

aws_advanced_python_wrapper/utils/properties.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ class WrapperProperties:
369369
"Max number of connection retries the Limitless Connection Plugin will attempt.",
370370
5)
371371

372-
MAX_RETRIES_MS = WrapperProperty("limitless_connection_max_retries_ms",
372+
MAX_RETRIES_MS = WrapperProperty("limitless_max_retries_ms",
373373
"Interval in milliseconds between polling for Limitless Transaction Routers to the database.",
374374
7_500)
375375

0 commit comments

Comments
 (0)