Skip to content

Commit ff1cee6

Browse files
committed
feat: limitless plugin implementation
1 parent 61dc665 commit ff1cee6

7 files changed

+819
-31
lines changed

aws_advanced_python_wrapper/connection_provider.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626

2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
2828
from aws_advanced_python_wrapper.host_selector import (
29-
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30-
WeightedRandomHostSelector)
29+
HighestWeightHostSelector, HostSelector, RandomHostSelector,
30+
RoundRobinHostSelector, WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.log import Logger
3333
from aws_advanced_python_wrapper.utils.messages import Messages
@@ -98,7 +98,8 @@ def connect(
9898
class DriverConnectionProvider(ConnectionProvider):
9999
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
100100
"round_robin": RoundRobinHostSelector(),
101-
"weighted_random": WeightedRandomHostSelector()}
101+
"weighted_random": WeightedRandomHostSelector(),
102+
"highest_weight": HighestWeightHostSelector()}
102103

103104
def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool:
104105
return True

aws_advanced_python_wrapper/host_selector.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,3 +260,15 @@ def _update_host_weight_map_from_string(self, props: Optional[Properties] = None
260260
except ValueError:
261261
logger.error(message, pair)
262262
raise AwsWrapperError(Messages.get_formatted(message, pair))
263+
264+
265+
class HighestWeightHostSelector(HostSelector):
266+
267+
def get_host(self, hosts: Tuple[HostInfo, ...], role: HostRole, props: Optional[Properties] = None) -> HostInfo:
268+
eligible_hosts: List[HostInfo] = [host for host in hosts if
269+
host.role == role and host.get_availability() == HostAvailability.AVAILABLE]
270+
271+
if len(eligible_hosts) == 0:
272+
raise AwsWrapperError(Messages.get_formatted("HostSelector.NoHostsMatchingRole", role))
273+
274+
return max(eligible_hosts, key=lambda host: host.weight)

aws_advanced_python_wrapper/limitless_connection_plugin.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
SlidingExpirationCacheWithCleanupThread
3636
from aws_advanced_python_wrapper.utils.telemetry.telemetry import (
3737
TelemetryContext, TelemetryFactory, TelemetryTraceLevel)
38+
from aws_advanced_python_wrapper.utils.utils import LogUtils
3839

3940
if TYPE_CHECKING:
4041
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
@@ -50,6 +51,10 @@ class LimitlessConnectionPlugin(Plugin):
5051
def __init__(self, plugin_service: PluginService, props: Properties):
5152
self._plugin_service = plugin_service
5253
self._properties = props
54+
self._limitless_router_service = LimitlessRouterService(
55+
self._plugin_service,
56+
LimitlessQueryHelper(self._plugin_service)
57+
)
5358

5459
@property
5560
def subscribed_methods(self) -> Set[str]:
@@ -68,34 +73,27 @@ def connect(
6873

6974
dialect: DatabaseDialect = self._plugin_service.database_dialect
7075
if not isinstance(dialect, AuroraLimitlessDialect):
71-
connection = connect_func()
7276
refreshed_dialect = self._plugin_service.database_dialect
73-
7477
if not isinstance(refreshed_dialect, AuroraLimitlessDialect):
7578
raise UnsupportedOperationError(
7679
Messages.get_formatted("LimitlessConnectionPlugin.UnsupportedDialectOrDatabase",
7780
type(refreshed_dialect).__name__))
7881

79-
limitless_router_service = LimitlessRouterService(
80-
self._plugin_service,
81-
LimitlessQueryHelper(self._plugin_service)
82-
)
83-
8482
if is_initial_connection:
85-
limitless_router_service.start_monitoring(host_info, props)
83+
self._limitless_router_service.start_monitoring(host_info, props)
8684

87-
context: LimitlessConnectionContext = LimitlessConnectionContext(
85+
self._context: LimitlessConnectionContext = LimitlessConnectionContext(
8886
host_info,
8987
props,
9088
connection,
9189
connect_func,
9290
[],
9391
self
9492
)
95-
limitless_router_service.establish_connection(context)
96-
connection = context.get_connection()
93+
self._limitless_router_service.establish_connection(self._context)
94+
connection = self._context.get_connection()
9795
if connection is not None and not self._plugin_service.driver_dialect.is_closed(connection):
98-
return context.get_connection()
96+
return connection
9997

10098
raise AwsWrapperError(Messages.get_formatted("LimitlessConnectionPlugin.FailedToConnectToHost", host_info.host))
10199

@@ -172,6 +170,7 @@ def run(self):
172170
lambda _: new_limitless_routers,
173171
WrapperProperties.LIMITLESS_MONITOR_DISPOSAL_TIME_MS.get(
174172
self._properties) * 1_000_000)
173+
logger.debug(LogUtils.log_topology(tuple(new_limitless_routers), "[limitlessRouterMonitor] Topology:"))
175174

176175
sleep(self._interval_ms / 1000)
177176

@@ -331,18 +330,19 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
331330
self._plugin_service.host_list_provider.get_cluster_id(), context.get_props()))
332331

333332
if context.get_limitless_routers() is None or len(context.get_limitless_routers()) == 0:
334-
logger.debug("LimitlessRouterServiceImpl.limitlessRouterCacheEmpty")
333+
logger.debug("LimitlessRouterService.:LimitlessRouterCacheEmpty")
335334

336335
wait_for_router_info = WrapperProperties.WAIT_FOR_ROUTER_INFO.get(context.get_props())
337336
if wait_for_router_info:
338337
self._synchronously_get_limitless_routers_with_retry(context)
339338
else:
340-
logger.debug("LimitlessRouterServiceImpl.UsingProvidedConnectUrl")
339+
logger.debug("LimitlessRouterService.UsingProvidedConnectUrl")
341340
if context.get_connection() is None or self._plugin_service.driver_dialect.is_closed(context.get_connection()):
342341
context.set_connection(context.get_connect_func()())
342+
return
343343

344-
if context.get_host_info in context.get_limitless_routers():
345-
logger.debug("LimitlessRouterServiceImpl.ConnectWithHost")
344+
if context.get_host_info() in context.get_limitless_routers():
345+
logger.debug("LimitlessRouterService.ConnectWithHost")
346346
if context.get_connection() is None:
347347
try:
348348
context.set_connection(context.get_connect_func()())
@@ -356,7 +356,7 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
356356
try:
357357
selected_host_info = self._plugin_service.get_host_info_by_strategy(
358358
HostRole.WRITER, "weighted_random", context.get_limitless_routers())
359-
logger.debug("LimitlessRouterServiceImpl.SelectedHost", "None" if selected_host_info is None else selected_host_info.host)
359+
logger.debug("LimitlessRouterService.SelectedHost", "None" if selected_host_info is None else selected_host_info.host)
360360
except Exception as e:
361361
if self._is_login_exception(e) or isinstance(e, UnsupportedOperationError):
362362
raise e
@@ -375,7 +375,7 @@ def establish_connection(self, context: LimitlessConnectionContext) -> None:
375375
raise e
376376

377377
if selected_host_info is not None:
378-
logger.debug("LimitlessRouterServiceImpl.FailedToConnectToHost", selected_host_info.host)
378+
logger.debug("LimitlessRouterService.FailedToConnectToHost", selected_host_info.host)
379379
selected_host_info.set_availability(HostAvailability.UNAVAILABLE)
380380

381381
self._retry_connection_with_least_loaded_routers(context)
@@ -400,7 +400,7 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
400400
if (context.get_limitless_routers() is None
401401
or len(context.get_limitless_routers()) == 0
402402
or not context.is_any_router_available()):
403-
logger.debug("LimitlessRouterServiceImpl.NoRoutersAvailableForRetry")
403+
logger.debug("LimitlessRouterService.NoRoutersAvailableForRetry")
404404

405405
if context.get_connection() is not None and not self._plugin_service.driver_dialect.is_closed(context.get_connection()):
406406
return
@@ -417,14 +417,14 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
417417

418418
try:
419419
selected_host_info = self._plugin_service.get_host_info_by_strategy(
420-
HostRole.WRITER, "weighted_random", context.get_limitless_routers())
421-
logger.debug("LimitlessRouterServiceImpl.SelectedHostForRetry",
420+
HostRole.WRITER, "highest_weight", context.get_limitless_routers())
421+
logger.debug("LimitlessRouterService.SelectedHostForRetry",
422422
"None" if selected_host_info is None else selected_host_info.host)
423423
if selected_host_info is None:
424424
continue
425425

426426
except UnsupportedOperationError as e:
427-
logger.error("LimitlessRouterServiceImpl.IncorrectConfiguration")
427+
logger.error("LimitlessRouterService.IncorrectConfiguration")
428428
raise e
429429
except AwsWrapperError:
430430
continue
@@ -438,14 +438,14 @@ def _retry_connection_with_least_loaded_routers(self, context: LimitlessConnecti
438438
if self._is_login_exception(e):
439439
raise e
440440
selected_host_info.set_availability(HostAvailability.UNAVAILABLE)
441-
logger.debug("LimitlessRouterServiceImpl.FailedToConnectToHost", selected_host_info.host)
441+
logger.debug("LimitlessRouterService.FailedToConnectToHost", selected_host_info.host)
442442

443443
raise AwsWrapperError(Messages.get("LimitlessRouterService.MaxRetriesExceeded"))
444444

445445
def _synchronously_get_limitless_routers_with_retry(self, context: LimitlessConnectionContext) -> None:
446-
logger.debug("LimitlessRouterServiceImpl.SynchronouslyGetLimitlessRouters")
446+
logger.debug("LimitlessRouterService.SynchronouslyGetLimitlessRouters")
447447
retry_count = -1
448-
max_retries = WrapperProperties.MAX_RETRIES_MS.get_int(context.get_props())
448+
max_retries = WrapperProperties.GET_ROUTER_MAX_RETRIES.get_int(context.get_props())
449449
retry_interval_ms = WrapperProperties.GET_ROUTER_RETRY_INTERVAL_MS.get_float(context.get_props())
450450
first_iteration = True
451451
while first_iteration or retry_count < max_retries:

aws_advanced_python_wrapper/sql_alchemy_connection_provider.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
from aws_advanced_python_wrapper.connection_provider import ConnectionProvider
2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
2828
from aws_advanced_python_wrapper.host_selector import (
29-
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30-
WeightedRandomHostSelector)
29+
HighestWeightHostSelector, HostSelector, RandomHostSelector,
30+
RoundRobinHostSelector, WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.messages import Messages
3333
from aws_advanced_python_wrapper.utils.properties import (Properties,
@@ -48,7 +48,8 @@ class SqlAlchemyPooledConnectionProvider(ConnectionProvider, CanReleaseResources
4848
_LEAST_CONNECTIONS: ClassVar[str] = "least_connections"
4949
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
5050
"round_robin": RoundRobinHostSelector(),
51-
"weighted_random": WeightedRandomHostSelector()}
51+
"weighted_random": WeightedRandomHostSelector(),
52+
"highest_weight": HighestWeightHostSelector()}
5253
_rds_utils: ClassVar[RdsUtils] = RdsUtils()
5354
_database_pools: ClassVar[SlidingExpirationCache[PoolKey, QueuePool]] = SlidingExpirationCache(
5455
should_dispose_func=lambda queue_pool: queue_pool.checkedout() == 0,
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from aws_advanced_python_wrapper.host_availability import HostAvailability
16+
from aws_advanced_python_wrapper.host_selector import HighestWeightHostSelector
17+
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
18+
from aws_advanced_python_wrapper.utils.properties import Properties
19+
20+
HOST_ROLE = HostRole.READER
21+
22+
23+
def test_get_host_given_unavailable_host():
24+
unavailable_host: HostInfo = HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE)
25+
available_host: HostInfo = HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE)
26+
27+
host_selector = HighestWeightHostSelector()
28+
actual_host = host_selector.get_host((unavailable_host, available_host), HOST_ROLE, Properties())
29+
30+
assert available_host == actual_host
31+
32+
33+
def test_get_host_given_multiple_unavailable_hosts():
34+
hosts = (
35+
HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE),
36+
HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE),
37+
HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE)
38+
)
39+
40+
host_selector = HighestWeightHostSelector()
41+
actual_host = host_selector.get_host(hosts, HOST_ROLE, Properties())
42+
43+
assert HostAvailability.AVAILABLE == actual_host.get_availability()
44+
45+
46+
def test_get_host_given_different_weights():
47+
48+
highest_weight_host = HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE, weight=3)
49+
50+
hosts = (
51+
HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE),
52+
HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE),
53+
HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE, weight=1),
54+
HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE, weight=2),
55+
highest_weight_host
56+
)
57+
58+
host_selector = HighestWeightHostSelector()
59+
actual_host = host_selector.get_host(hosts, HOST_ROLE, Properties())
60+
61+
assert actual_host == highest_weight_host

0 commit comments

Comments
 (0)