Skip to content

Commit ddbb65f

Browse files
authored
feat: limitless plugin implementation (#912)
1 parent 0bb3882 commit ddbb65f

21 files changed

+1453
-38
lines changed

aws_advanced_python_wrapper/connection_provider.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626

2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
2828
from aws_advanced_python_wrapper.host_selector import (
29-
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30-
WeightedRandomHostSelector)
29+
HighestWeightHostSelector, HostSelector, RandomHostSelector,
30+
RoundRobinHostSelector, WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.log import Logger
3333
from aws_advanced_python_wrapper.utils.messages import Messages
@@ -98,7 +98,8 @@ def connect(
9898
class DriverConnectionProvider(ConnectionProvider):
9999
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
100100
"round_robin": RoundRobinHostSelector(),
101-
"weighted_random": WeightedRandomHostSelector()}
101+
"weighted_random": WeightedRandomHostSelector(),
102+
"highest_weight": HighestWeightHostSelector()}
102103

103104
def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool:
104105
return True

aws_advanced_python_wrapper/database_dialect.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
Protocol, Tuple, runtime_checkable)
1919

2020
from aws_advanced_python_wrapper.driver_info import DriverInfo
21+
from aws_advanced_python_wrapper.utils.rds_url_type import RdsUrlType
2122

2223
if TYPE_CHECKING:
2324
from aws_advanced_python_wrapper.pep249 import Connection
@@ -98,6 +99,15 @@ def is_reader_query(self) -> str:
9899
return self._IS_READER_QUERY
99100

100101

102+
@runtime_checkable
103+
class AuroraLimitlessDialect(Protocol):
104+
_LIMITLESS_ROUTER_ENDPOINT_QUERY: str
105+
106+
@property
107+
def limitless_router_endpoint_query(self) -> str:
108+
return self._LIMITLESS_ROUTER_ENDPOINT_QUERY
109+
110+
101111
class DatabaseDialect(Protocol):
102112
"""
103113
Database dialects help the AWS Advanced Python Driver determine what kind of underlying database is being used,
@@ -342,7 +352,7 @@ def get_host_list_provider_supplier(self) -> Callable:
342352
return lambda provider_service, props: RdsHostListProvider(provider_service, props)
343353

344354

345-
class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect):
355+
class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect, AuroraLimitlessDialect):
346356
_DIALECT_UPDATE_CANDIDATES: Tuple[DialectCode, ...] = (DialectCode.MULTI_AZ_PG,)
347357

348358
_EXTENSIONS_QUERY = "SELECT (setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils " \
@@ -359,6 +369,7 @@ class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect):
359369

360370
_HOST_ID_QUERY = "SELECT aurora_db_instance_identifier()"
361371
_IS_READER_QUERY = "SELECT pg_is_in_recovery()"
372+
_LIMITLESS_ROUTER_ENDPOINT_QUERY = "SELECT router_endpoint, load FROM aurora_limitless_router_endpoints()"
362373

363374
@property
364375
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
@@ -621,6 +632,11 @@ def get_dialect(self, driver_dialect: str, props: Properties) -> DatabaseDialect
621632

622633
if target_driver_type is TargetDriverType.POSTGRES:
623634
rds_type = self._rds_helper.identify_rds_type(host)
635+
if rds_type == RdsUrlType.RDS_AURORA_LIMITLESS_DB_SHARD_GROUP:
636+
self._can_update = False
637+
self._dialect_code = DialectCode.AURORA_PG
638+
self._dialect = DatabaseDialectManager._known_dialects_by_code[DialectCode.AURORA_PG]
639+
return self._dialect
624640
if rds_type.is_rds_cluster:
625641
self._can_update = True
626642
self._dialect_code = DialectCode.AURORA_PG

aws_advanced_python_wrapper/default_plugin.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import annotations
1616

17-
from typing import TYPE_CHECKING
17+
from typing import TYPE_CHECKING, List, Optional
1818

1919
if TYPE_CHECKING:
2020
from aws_advanced_python_wrapper.connection_provider import (ConnectionProvider,
@@ -118,7 +118,7 @@ def accepts_strategy(self, role: HostRole, strategy: str) -> bool:
118118
return False
119119
return self._connection_provider_manager.accepts_strategy(role, strategy)
120120

121-
def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
121+
def get_host_info_by_strategy(self, role: HostRole, strategy: str, host_list: Optional[List[HostInfo]] = None) -> HostInfo:
122122
if HostRole.UNKNOWN == role:
123123
raise AwsWrapperError(Messages.get("DefaultPlugin.UnknownHosts"))
124124

@@ -127,7 +127,10 @@ def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
127127
if len(hosts) < 1:
128128
raise AwsWrapperError(Messages.get("DefaultPlugin.EmptyHosts"))
129129

130-
return self._connection_provider_manager.get_host_info_by_strategy(hosts, role, strategy, self._plugin_service.props)
130+
if host_list is None:
131+
return self._connection_provider_manager.get_host_info_by_strategy(hosts, role, strategy, self._plugin_service.props)
132+
else:
133+
return self._connection_provider_manager.get_host_info_by_strategy(tuple(host_list), role, strategy, self._plugin_service.props)
131134

132135
@property
133136
def subscribed_methods(self) -> Set[str]:

aws_advanced_python_wrapper/fastest_response_strategy_plugin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ def __init__(self, plugin_service: PluginService, props: Properties):
5858
self._plugin_service = plugin_service
5959
self._properties = props
6060
self._host_response_time_service: HostResponseTimeService = \
61-
HostResponseTimeService(plugin_service, props, WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MILLIS.get_int(props))
62-
self._cache_expiration_nanos = WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MILLIS.get_int(props) * 10 ^ 6
61+
HostResponseTimeService(plugin_service, props, WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MS.get_int(props))
62+
self._cache_expiration_nanos = WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MS.get_int(props) * 10 ^ 6
6363
self._random_host_selector = RandomHostSelector()
6464
self._cached_fastest_response_host_by_role: CacheMap[str, HostInfo] = CacheMap()
6565
self._hosts: Tuple[HostInfo, ...] = ()
@@ -86,7 +86,7 @@ def connect(
8686
def accepts_strategy(self, role: HostRole, strategy: str) -> bool:
8787
return strategy == FastestResponseStrategyPlugin._FASTEST_RESPONSE_STRATEGY_NAME
8888

89-
def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
89+
def get_host_info_by_strategy(self, role: HostRole, strategy: str, host_list: Optional[List[HostInfo]] = None) -> HostInfo:
9090
if not self.accepts_strategy(role, strategy):
9191
logger.error("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy)
9292
raise AwsWrapperError(Messages.get_formatted("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy))

aws_advanced_python_wrapper/host_list_provider.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ def get_host_role(self, connection: Connection) -> HostRole:
6969
def identify_connection(self, connection: Optional[Connection]) -> Optional[HostInfo]:
7070
...
7171

72+
def get_cluster_id(self) -> str:
73+
...
74+
7275

7376
@runtime_checkable
7477
class DynamicHostListProvider(HostListProvider, Protocol):
@@ -519,6 +522,10 @@ def _identify_connection(self, conn: Connection):
519522
cursor.execute(self._dialect.host_id_query)
520523
return cursor.fetchone()
521524

525+
def get_cluster_id(self):
526+
self._initialize()
527+
return self._cluster_id
528+
522529
@dataclass()
523530
class ClusterIdSuggestion:
524531
cluster_id: str
@@ -646,3 +653,6 @@ def get_host_role(self, connection: Connection) -> HostRole:
646653
def identify_connection(self, connection: Optional[Connection]) -> Optional[HostInfo]:
647654
raise UnsupportedOperationError(
648655
Messages.get_formatted("ConnectionStringHostListProvider.UnsupportedMethod", "identify_connection"))
656+
657+
def get_cluster_id(self):
658+
return "<none>"

aws_advanced_python_wrapper/host_selector.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,3 +260,15 @@ def _update_host_weight_map_from_string(self, props: Optional[Properties] = None
260260
except ValueError:
261261
logger.error(message, pair)
262262
raise AwsWrapperError(Messages.get_formatted(message, pair))
263+
264+
265+
class HighestWeightHostSelector(HostSelector):
266+
267+
def get_host(self, hosts: Tuple[HostInfo, ...], role: HostRole, props: Optional[Properties] = None) -> HostInfo:
268+
eligible_hosts: List[HostInfo] = [host for host in hosts if
269+
host.role == role and host.get_availability() == HostAvailability.AVAILABLE]
270+
271+
if len(eligible_hosts) == 0:
272+
raise AwsWrapperError(Messages.get_formatted("HostSelector.NoHostsMatchingRole", role))
273+
274+
return max(eligible_hosts, key=lambda host: host.weight)

0 commit comments

Comments
 (0)