Skip to content

Commit f5f5f71

Browse files
committed
feat: limitless plugin implementation
1 parent 3dd5f0f commit f5f5f71

12 files changed

+646
-22
lines changed

aws_advanced_python_wrapper/database_dialect.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ def is_reader_query(self) -> str:
9898
return self._IS_READER_QUERY
9999

100100

101+
@runtime_checkable
102+
class AuroraLimitlessDialect(Protocol):
103+
_LIMITLESS_ROUTER_ENDPOINT_QUERY: str
104+
105+
@property
106+
def limitless_router_endpoint_query(self) -> str:
107+
return self._LIMITLESS_ROUTER_ENDPOINT_QUERY
108+
109+
101110
class DatabaseDialect(Protocol):
102111
"""
103112
Database dialects help the AWS Advanced Python Driver determine what kind of underlying database is being used,
@@ -342,7 +351,7 @@ def get_host_list_provider_supplier(self) -> Callable:
342351
return lambda provider_service, props: RdsHostListProvider(provider_service, props)
343352

344353

345-
class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect):
354+
class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect, AuroraLimitlessDialect):
346355
_DIALECT_UPDATE_CANDIDATES: Tuple[DialectCode, ...] = (DialectCode.MULTI_AZ_PG,)
347356

348357
_EXTENSIONS_QUERY = "SELECT (setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils " \
@@ -359,6 +368,7 @@ class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect):
359368

360369
_HOST_ID_QUERY = "SELECT aurora_db_instance_identifier()"
361370
_IS_READER_QUERY = "SELECT pg_is_in_recovery()"
371+
_LIMITLESS_ROUTER_ENDPOINT_QUERY = "SELECT router_endpoint, load FROM aurora_limitless_router_endpoints()"
362372

363373
@property
364374
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:

aws_advanced_python_wrapper/default_plugin.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import annotations
1616

17-
from typing import TYPE_CHECKING
17+
from typing import TYPE_CHECKING, List, Optional
1818

1919
if TYPE_CHECKING:
2020
from aws_advanced_python_wrapper.connection_provider import (ConnectionProvider,
@@ -118,7 +118,7 @@ def accepts_strategy(self, role: HostRole, strategy: str) -> bool:
118118
return False
119119
return self._connection_provider_manager.accepts_strategy(role, strategy)
120120

121-
def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
121+
def get_host_info_by_strategy(self, role: HostRole, strategy: str, host_list: Optional[List[HostInfo]] = None) -> HostInfo:
122122
if HostRole.UNKNOWN == role:
123123
raise AwsWrapperError(Messages.get("DefaultPlugin.UnknownHosts"))
124124

@@ -127,7 +127,10 @@ def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
127127
if len(hosts) < 1:
128128
raise AwsWrapperError(Messages.get("DefaultPlugin.EmptyHosts"))
129129

130-
return self._connection_provider_manager.get_host_info_by_strategy(hosts, role, strategy, self._plugin_service.props)
130+
if host_list is None:
131+
return self._connection_provider_manager.get_host_info_by_strategy(hosts, role, strategy, self._plugin_service.props)
132+
else:
133+
return self._connection_provider_manager.get_host_info_by_strategy(tuple(host_list), role, strategy, self._plugin_service.props)
131134

132135
@property
133136
def subscribed_methods(self) -> Set[str]:

aws_advanced_python_wrapper/fastest_response_strategy_plugin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ def __init__(self, plugin_service: PluginService, props: Properties):
5858
self._plugin_service = plugin_service
5959
self._properties = props
6060
self._host_response_time_service: HostResponseTimeService = \
61-
HostResponseTimeService(plugin_service, props, WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MILLIS.get_int(props))
62-
self._cache_expiration_nanos = WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MILLIS.get_int(props) * 10 ^ 6
61+
HostResponseTimeService(plugin_service, props, WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MS.get_int(props))
62+
self._cache_expiration_nanos = WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_MS.get_int(props) * 10 ^ 6
6363
self._random_host_selector = RandomHostSelector()
6464
self._cached_fastest_response_host_by_role: CacheMap[str, HostInfo] = CacheMap()
6565
self._hosts: Tuple[HostInfo, ...] = ()
@@ -86,7 +86,7 @@ def connect(
8686
def accepts_strategy(self, role: HostRole, strategy: str) -> bool:
8787
return strategy == FastestResponseStrategyPlugin._FASTEST_RESPONSE_STRATEGY_NAME
8888

89-
def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
89+
def get_host_info_by_strategy(self, role: HostRole, strategy: str, host_list: Optional[List[HostInfo]] = None) -> HostInfo:
9090
if not self.accepts_strategy(role, strategy):
9191
logger.error("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy)
9292
raise AwsWrapperError(Messages.get_formatted("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy))

aws_advanced_python_wrapper/host_list_provider.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ def get_host_role(self, connection: Connection) -> HostRole:
6969
def identify_connection(self, connection: Optional[Connection]) -> Optional[HostInfo]:
7070
...
7171

72+
def get_cluster_id(self) -> str:
73+
...
74+
7275

7376
@runtime_checkable
7477
class DynamicHostListProvider(HostListProvider, Protocol):
@@ -519,6 +522,10 @@ def _identify_connection(self, conn: Connection):
519522
cursor.execute(self._dialect.host_id_query)
520523
return cursor.fetchone()
521524

525+
def get_cluster_id(self):
526+
self._initialize()
527+
return self._cluster_id
528+
522529
@dataclass()
523530
class ClusterIdSuggestion:
524531
cluster_id: str
@@ -646,3 +653,6 @@ def get_host_role(self, connection: Connection) -> HostRole:
646653
def identify_connection(self, connection: Optional[Connection]) -> Optional[HostInfo]:
647654
raise UnsupportedOperationError(
648655
Messages.get_formatted("ConnectionStringHostListProvider.UnsupportedMethod", "identify_connection"))
656+
657+
def get_cluster_id(self):
658+
return "<none>"

0 commit comments

Comments
 (0)