Skip to content

Commit 58df0c2

Browse files
committed
feat: fastest response strategy plugin
1 parent 0c9e694 commit 58df0c2

17 files changed

+852
-5
lines changed

.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.12.1

aws_advanced_python_wrapper/driver_dialect.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,13 @@ def unwrap_connection(self, conn_obj: object) -> Any:
152152

153153
def transfer_session_state(self, from_conn: Connection, to_conn: Connection):
154154
return
155+
156+
def ping(self, conn: Connection) -> bool:
157+
try:
158+
with conn.cursor() as cursor:
159+
query = "SELECT 1"
160+
self.execute("Cursor.execute", lambda: cursor.execute(query), query, exec_timeout=10)
161+
cursor.fetchone()
162+
return True
163+
except Exception:
164+
return False
Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from copy import copy
18+
from dataclasses import dataclass
19+
from datetime import datetime
20+
from threading import Event, Lock, Thread
21+
from time import sleep
22+
from typing import (TYPE_CHECKING, Callable, ClassVar, Dict, List, Optional,
23+
Set, Tuple)
24+
25+
from aws_advanced_python_wrapper.errors import AwsWrapperError
26+
from aws_advanced_python_wrapper.hostselector import RandomHostSelector
27+
from aws_advanced_python_wrapper.plugin import Plugin
28+
from aws_advanced_python_wrapper.utils.cache_map import CacheMap
29+
from aws_advanced_python_wrapper.utils.log import Logger
30+
from aws_advanced_python_wrapper.utils.messages import Messages
31+
from aws_advanced_python_wrapper.utils.properties import (Properties,
32+
WrapperProperties)
33+
from aws_advanced_python_wrapper.utils.sliding_expiration_cache import \
34+
SlidingExpirationCacheWithCleanupThread
35+
from aws_advanced_python_wrapper.utils.telemetry.telemetry import (
36+
TelemetryContext, TelemetryFactory, TelemetryGauge, TelemetryTraceLevel)
37+
38+
if TYPE_CHECKING:
39+
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
40+
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
41+
from aws_advanced_python_wrapper.pep249 import Connection
42+
from aws_advanced_python_wrapper.plugin_service import PluginService
43+
from aws_advanced_python_wrapper.utils.notifications import HostEvent
44+
45+
logger = Logger(__name__)
46+
47+
MAX_VALUE = 2147483647
48+
49+
50+
class FastestResponseStrategyPlugin(Plugin):
51+
_FASTEST_RESPONSE_STRATEGY_NAME = "fastest_response"
52+
_SUBSCRIBED_METHODS: Set[str] = {"accepts_strategy",
53+
"get_host_info_by_strategy",
54+
"notify_host_list_changed"}
55+
56+
def __init__(self, plugin_service: PluginService, props: Properties):
57+
self._plugin_service = plugin_service
58+
self._properties = props
59+
self._host_response_time_service: HostResponseTimeService = \
60+
HostResponseTimeService(plugin_service, props, WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_NANOS.get_int(props))
61+
self._cache_expiration_nanos = WrapperProperties.RESPONSE_MEASUREMENT_INTERVAL_NANOS.get_int(props)
62+
self._random_host_selector = RandomHostSelector()
63+
self._cached_fastest_response_host_by_role: CacheMap[str, HostInfo] = CacheMap()
64+
self._hosts: Tuple[HostInfo, ...] = ()
65+
66+
@property
67+
def subscribed_methods(self) -> Set[str]:
68+
return self._SUBSCRIBED_METHODS
69+
70+
def connect(
71+
self,
72+
target_driver_func: Callable,
73+
driver_dialect: DriverDialect,
74+
host_info: HostInfo,
75+
props: Properties,
76+
is_initial_connection: bool,
77+
connect_func: Callable) -> Connection:
78+
return self._connect(host_info, props, is_initial_connection, connect_func)
79+
80+
def force_connect(
81+
self,
82+
target_driver_func: Callable,
83+
driver_dialect: DriverDialect,
84+
host_info: HostInfo,
85+
props: Properties,
86+
is_initial_connection: bool,
87+
force_connect_func: Callable) -> Connection:
88+
return self._connect(host_info, props, is_initial_connection, force_connect_func)
89+
90+
def _connect(
91+
self,
92+
host: HostInfo,
93+
properties: Properties,
94+
is_initial_connection: bool,
95+
connect_func: Callable) -> Connection:
96+
conn = connect_func()
97+
98+
if is_initial_connection:
99+
self._plugin_service.refresh_host_list(conn)
100+
101+
return conn
102+
103+
def accepts_strategy(self, role: HostRole, strategy: str) -> bool:
104+
return strategy == FastestResponseStrategyPlugin._FASTEST_RESPONSE_STRATEGY_NAME
105+
106+
def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
107+
if not self.accepts_strategy(role, strategy):
108+
logger.error("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy)
109+
raise AwsWrapperError(Messages.get_formatted("FastestResponseStrategyPlugin.UnsupportedHostSelectorStrategy", strategy))
110+
111+
fastest_response_host: Optional[HostInfo] = self._cached_fastest_response_host_by_role.get(role.name)
112+
if fastest_response_host is not None:
113+
114+
# Found a fastest host. Let's find it in the the latest topology.
115+
for host in self._plugin_service.hosts:
116+
if host == fastest_response_host:
117+
# found the fastest host in the topology
118+
return host
119+
# It seems that the fastest cached host isn't in the latest topology.
120+
# Let's ignore cached results and find the fastest host.
121+
122+
# Cached result isn't available. Need to find the fastest response time host.
123+
eligible_hosts: List[FastestResponseStrategyPlugin.ResponseTimeTuple] = []
124+
for host in self._plugin_service.hosts:
125+
if role == host.role:
126+
response_time_tuple = FastestResponseStrategyPlugin.ResponseTimeTuple(host,
127+
self._host_response_time_service.get_response_time(host))
128+
eligible_hosts.append(response_time_tuple)
129+
130+
# Sort by response time then retrieve the first host
131+
sorted_eligible_hosts: List[FastestResponseStrategyPlugin.ResponseTimeTuple] = \
132+
sorted(eligible_hosts, key=lambda x: x.response_time)
133+
134+
calculated_fastest_response_host = sorted_eligible_hosts[0].host_info
135+
if calculated_fastest_response_host is None or \
136+
self._host_response_time_service.get_response_time(calculated_fastest_response_host) == MAX_VALUE:
137+
logger.debug("FastestResponseStrategyPlugin.RandomHostSelected")
138+
return self._random_host_selector.get_host(self._plugin_service.hosts, role, self._properties)
139+
140+
self._cached_fastest_response_host_by_role.put(role.name,
141+
calculated_fastest_response_host,
142+
self._cache_expiration_nanos)
143+
144+
self._host_response_time_service.get_response_time(calculated_fastest_response_host)
145+
return calculated_fastest_response_host
146+
147+
def notify_host_list_changed(self, changes: Dict[str, Set[HostEvent]]):
148+
self._hosts = self._plugin_service.hosts
149+
if self._host_response_time_service:
150+
self._host_response_time_service.set_hosts(self._hosts)
151+
152+
@dataclass
153+
class ResponseTimeTuple:
154+
host_info: HostInfo
155+
response_time: int
156+
157+
158+
class FastestResponseStrategyPluginFactory:
159+
160+
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
161+
return FastestResponseStrategyPlugin(plugin_service, props)
162+
163+
164+
class HostResponseTimeMonitor:
165+
166+
_MONITORING_PROPERTY_PREFIX: str = "frt-"
167+
_NUM_OF_MEASURES: int = 5
168+
169+
def __init__(self, plugin_service: PluginService, host_info: HostInfo, props: Properties, interval_ms: int):
170+
self._plugin_service = plugin_service
171+
self._host_info = host_info
172+
self._properties = props
173+
self._interval_ms = interval_ms
174+
175+
self._telemetry_factory: TelemetryFactory = self._plugin_service.get_telemetry_factory()
176+
self._response_time: int = MAX_VALUE
177+
self._lock: Lock = Lock()
178+
self._monitoring_conn: Optional[Connection] = None
179+
self._is_stopped: Event = Event()
180+
181+
self._host_id: Optional[str] = self._host_info.host_id
182+
if self._host_id is None or self._host_id == "":
183+
self._host_id = self._host_info.host
184+
185+
self._daemon_thread: Thread = Thread(daemon=True, target=self.run)
186+
187+
# Report current response time (in milliseconds) to telemetry engine.
188+
# Report -1 if response time couldn't be measured.
189+
self._response_time_gauge: TelemetryGauge = \
190+
self._telemetry_factory.create_gauge("frt.response.time." + self._host_id,
191+
lambda: self._response_time if self._response_time != MAX_VALUE else -1)
192+
self._daemon_thread.start()
193+
194+
@property
195+
def response_time(self):
196+
return self._response_time
197+
198+
@response_time.setter
199+
def response_time(self, response_time: int):
200+
self._response_time = response_time
201+
202+
@property
203+
def host_info(self):
204+
return self._host_info
205+
206+
@property
207+
def is_stopped(self):
208+
return self._is_stopped.is_set()
209+
210+
def close(self):
211+
self._is_stopped.set()
212+
self._daemon_thread.join()
213+
logger.debug("HostResponseTimeMonitor.Stopped", self._host_info.host)
214+
215+
def _get_current_time(self):
216+
return datetime.now().microsecond / 1000 # milliseconds
217+
218+
def run(self):
219+
context: TelemetryContext = self._telemetry_factory.open_telemetry_context(
220+
"node response time thread", TelemetryTraceLevel.TOP_LEVEL)
221+
context.set_attribute("url", self._host_info.url)
222+
try:
223+
while not self.is_stopped:
224+
self._open_connection()
225+
226+
if self._monitoring_conn is not None:
227+
228+
response_time_sum = 0
229+
count = 0
230+
for i in range(self._NUM_OF_MEASURES):
231+
if self.is_stopped:
232+
break
233+
start_time = self._get_current_time()
234+
if self._plugin_service.driver_dialect.ping(self._monitoring_conn):
235+
calculated_response_time = self._get_current_time() - start_time
236+
response_time_sum = response_time_sum + calculated_response_time
237+
count = count + 1
238+
239+
if count > 0:
240+
self.response_time = response_time_sum / count
241+
else:
242+
self.response_time = MAX_VALUE
243+
logger.debug("HostResponseTimeMonitor.ResponseTime", self._host_info.host, self._response_time)
244+
245+
sleep(self._interval_ms / 1000)
246+
247+
except InterruptedError:
248+
# exit thread
249+
logger.debug("HostResponseTimeMonitor.InterruptedExceptionDuringMonitoring", self._host_info.host)
250+
except Exception as e:
251+
# this should not be reached; log and exit thread
252+
logger.debug("HostResponseTimeMonitor.ExceptionDuringMonitoringStop",
253+
self._host_info.host,
254+
e) # print full trace stack of the exception.
255+
finally:
256+
self._is_stopped.set()
257+
if self._monitoring_conn is not None:
258+
try:
259+
self._monitoring_conn.close()
260+
except Exception:
261+
# Do nothing
262+
pass
263+
264+
if context is not None:
265+
context.close_context()
266+
267+
def _open_connection(self):
268+
try:
269+
driver_dialect = self._plugin_service.driver_dialect
270+
if self._monitoring_conn is None or driver_dialect.is_closed(self._monitoring_conn):
271+
monitoring_conn_properties: Properties = copy(self._properties)
272+
for key, value in self._properties.items():
273+
if key.startswith(self._MONITORING_PROPERTY_PREFIX):
274+
monitoring_conn_properties[key[len(self._MONITORING_PROPERTY_PREFIX):len(key)]] = value
275+
monitoring_conn_properties.pop(key, None)
276+
277+
logger.debug("HostResponseTimeMonitor.OpeningConnection", self._host_info.url)
278+
self._monitoring_conn = self._plugin_service.force_connect(self._host_info, monitoring_conn_properties, None)
279+
logger.debug("HostResponseTimeMonitor.OpenedConnection", self._host_info.url)
280+
281+
except Exception:
282+
if self._monitoring_conn is not None:
283+
try:
284+
self._monitoring_conn.close()
285+
except Exception:
286+
pass # ignore
287+
288+
self._monitoring_conn = None
289+
290+
291+
class HostResponseTimeService:
292+
_CACHE_EXPIRATION_NANOS: int = 6 * 10 ^ 11 # 10 minutes
293+
_CACHE_CLEANUP_NANO: int = 6 * 10 ^ 10 # 1 minute
294+
_lock: Lock = Lock()
295+
_monitoring_nodes: ClassVar[SlidingExpirationCacheWithCleanupThread[str, HostResponseTimeMonitor]] = \
296+
SlidingExpirationCacheWithCleanupThread(_CACHE_CLEANUP_NANO,
297+
should_dispose_func=lambda monitor: True,
298+
item_disposal_func=lambda monitor: HostResponseTimeService._monitor_close(monitor))
299+
300+
def __init__(self, plugin_service: PluginService, props: Properties, interval_ms: int):
301+
self._plugin_service = plugin_service
302+
self._properties = props
303+
self._interval_ms = interval_ms
304+
self._hosts: Tuple[HostInfo, ...] = ()
305+
self._telemetry_factory: TelemetryFactory = self._plugin_service.get_telemetry_factory()
306+
self._host_count_gauge: TelemetryGauge = self._telemetry_factory.create_gauge("frt.nodes.count", lambda: len(self._monitoring_nodes))
307+
308+
@property
309+
def hosts(self) -> Tuple[HostInfo, ...]:
310+
return self._hosts
311+
312+
@hosts.setter
313+
def hosts(self, new_hosts: Tuple[HostInfo, ...]):
314+
self._hosts = new_hosts
315+
316+
@staticmethod
317+
def _monitor_close(monitor: HostResponseTimeMonitor):
318+
try:
319+
monitor.close()
320+
except Exception:
321+
pass
322+
323+
def get_response_time(self, host_info: HostInfo) -> int:
324+
monitor: Optional[HostResponseTimeMonitor] = HostResponseTimeService._monitoring_nodes.get(host_info.url)
325+
if monitor is None:
326+
return MAX_VALUE
327+
return monitor.response_time
328+
329+
def set_hosts(self, new_hosts: Tuple[HostInfo, ...]) -> None:
330+
old_hosts_dict = {x.url: x for x in self.hosts}
331+
self.hosts = new_hosts
332+
333+
for host in self.hosts:
334+
if host.url not in old_hosts_dict:
335+
with self._lock:
336+
self._monitoring_nodes.compute_if_absent(host.url,
337+
lambda _: HostResponseTimeMonitor(
338+
self._plugin_service,
339+
host,
340+
self._properties,
341+
self._interval_ms), self._CACHE_EXPIRATION_NANOS)

aws_advanced_python_wrapper/mysql_driver_dialect.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ def transfer_session_state(self, from_conn: Connection, to_conn: Connection):
148148
isinstance(to_conn, CMySQLConnection) or isinstance(to_conn, MySQLConnection)):
149149
to_conn.autocommit = from_conn.autocommit
150150

151+
def ping(self, conn: Connection) -> bool:
152+
return not self.is_closed(conn)
153+
151154
def prepare_connect_info(self, host_info: HostInfo, original_props: Properties) -> Properties:
152155
driver_props: Properties = Properties(original_props.copy())
153156
PropertiesUtils.remove_wrapper_props(driver_props)

aws_advanced_python_wrapper/plugin_service.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
from typing import TYPE_CHECKING, ClassVar, List, Type
1818

19+
from aws_advanced_python_wrapper.fastest_response_strategy_plugin import \
20+
FastestResponseStrategyPluginFactory
1921
from aws_advanced_python_wrapper.federated_plugin import \
2022
FederatedAuthPluginFactory
2123

@@ -571,6 +573,7 @@ class PluginManager(CanReleaseResources):
571573
"host_monitoring": HostMonitoringPluginFactory,
572574
"failover": FailoverPluginFactory,
573575
"read_write_splitting": ReadWriteSplittingPluginFactory,
576+
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
574577
"stale_dns": StaleDnsPluginFactory,
575578
"connect_time": ConnectTimePluginFactory,
576579
"execute_time": ExecuteTimePluginFactory,
@@ -589,8 +592,9 @@ class PluginManager(CanReleaseResources):
589592
ReadWriteSplittingPluginFactory: 300,
590593
FailoverPluginFactory: 400,
591594
HostMonitoringPluginFactory: 500,
592-
IamAuthPluginFactory: 600,
593-
AwsSecretsManagerPluginFactory: 700,
595+
FastestResponseStrategyPluginFactory: 600,
596+
IamAuthPluginFactory: 700,
597+
AwsSecretsManagerPluginFactory: 800,
594598
ConnectTimePluginFactory: WEIGHT_RELATIVE_TO_PRIOR_PLUGIN,
595599
ExecuteTimePluginFactory: WEIGHT_RELATIVE_TO_PRIOR_PLUGIN,
596600
DeveloperPluginFactory: WEIGHT_RELATIVE_TO_PRIOR_PLUGIN,

0 commit comments

Comments
 (0)