Skip to content

Commit 43d9dd5

Browse files
committed
sliding expiration cache fix
1 parent a82c2a1 commit 43d9dd5

File tree

3 files changed

+21
-10
lines changed

3 files changed

+21
-10
lines changed

aws_advanced_python_wrapper/fastest_response_strategy_plugin.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,25 @@ def get_host_info_by_strategy(self, role: HostRole, strategy: str) -> HostInfo:
105105

106106
fastest_response_host: Optional[HostInfo] = self._cached_fastest_response_host_by_role.get(role.name)
107107
if fastest_response_host is not None:
108+
108109
# Found a fastest host. Let's find it in the the latest topology.
109110
for host in self._plugin_service.hosts:
110-
if host is not None:
111+
if host == fastest_response_host:
111112
# found the fastest host in the topology
112113
return host
113114
# It seems that the fastest cached host isn't in the latest topology.
114115
# Let's ignore cached results and find the fastest host.
115116

116117
# Cached result isn't available. Need to find the fastest response time host.
117-
calculated_fastest_response_host = None
118+
host_dict = {}
119+
for host in self._plugin_service.hosts:
120+
if role == host.role and self._host_response_time_service is not None:
121+
response_time_tuple = FastestResponseStrategyPlugin.ResponseTimeTuple(host,
122+
self._host_response_time_service.get_response_time(host))
123+
host_dict[response_time_tuple.host_info] = response_time_tuple.response_time
124+
# sort by response time then retrieve the first host
125+
sorted_host_dict = dict(sorted(host_dict.items(), key=lambda item: item[1]))
126+
calculated_fastest_response_host: HostInfo = next(iter(sorted_host_dict.keys()))
118127
if calculated_fastest_response_host is None:
119128
return self._random_host_selector.get_host(self._plugin_service.hosts, role, self._properties)
120129

@@ -219,7 +228,7 @@ def run(self):
219228
logger.debug("NodeResponseTimeMonitor.InterruptedExceptionDuringMonitoring", self._host_info.host)
220229
except Exception as e:
221230
# this should not be reached; log and exit thread
222-
logger.debug("NodeResponseTimeMonitor.RxceptionDuringMonitoringStop",
231+
logger.debug("NodeResponseTimeMonitor.ExceptionDuringMonitoringStop",
223232
self._host_info.host,
224233
e) # print full trace stack of the exception.
225234
finally:
@@ -243,7 +252,7 @@ def _open_connection(self):
243252

244253
logger.debug("NodeResponseTimeMonitor.OpeningConnection", self._host_info.url)
245254
self._monitoring_conn = self._plugin_service.force_connect(self._host_info, monitoring_conn_properties, None)
246-
logger.debug("NodeResponseTimeMonitor.openedConnection", self._host_info.url)
255+
logger.debug("NodeResponseTimeMonitor.OpenedConnection", self._host_info.url)
247256

248257
except Exception:
249258
if self._monitoring_conn is not None:
@@ -258,10 +267,11 @@ def _open_connection(self):
258267
class HostResponseTimeService:
259268
"""
260269
Return a response time in milliseconds to the host.
261-
Return _MAX_VALUE if response time is not available.
270+
Return HostResponseTimeService._MAX_VALUE if response time is not available.
262271
263272
@param hostSpec the host details
264-
@return response time in milliseconds for a desired host. It should return _MAX_VALUE if response time couldn't be measured.
273+
@return response time in milliseconds for a desired host. It should return HostResponseTimeService._MAX_VALUE
274+
if response time couldn't be measured.
265275
"""
266276

267277
@abstractmethod
@@ -280,8 +290,8 @@ class HostResponseTimeServiceImpl(HostResponseTimeService):
280290
_lock: Lock = Lock()
281291
_monitoring_nodes: ClassVar[SlidingExpirationCacheWithCleanupThread[str, NodeResponseTimeMonitor]] = \
282292
SlidingExpirationCacheWithCleanupThread(_CACHE_CLEANUP_NANO,
283-
should_dispose_func=lambda queue_pool: queue_pool.checkedout() == 0,
284-
item_disposal_func=lambda queue_pool: queue_pool.dispose())
293+
should_dispose_func=lambda monitor: True,
294+
item_disposal_func=lambda monitor: monitor.dispose())
285295

286296
def __init__(self, plugin_service: PluginService, props: Properties, interval_ms: int):
287297
self._plugin_service = plugin_service
@@ -309,4 +319,3 @@ def set_hosts(self, hosts: Tuple[HostInfo, ...]) -> None:
309319
new_host,
310320
self._properties,
311321
self._interval_ms), self._CACHE_EXPIRATION_NANO)
312-

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ RoundRobinHostSelector.ClusterInfoNone=[RoundRobinHostSelector] The round robin
250250
RoundRobinHostSelector.RoundRobinInvalidDefaultWeight=[RoundRobinHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
251251
RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1.
252252

253+
SlidingExpirationCache.CleaningUp=[SlidingExpirationCache] Cleaning up...
254+
253255
SqlAlchemyPooledConnectionProvider.PoolNone=[SqlAlchemyPooledConnectionProvider] Attempted to find or create a pool for '{}' but the result of the attempt evaluated to None.
254256
SqlAlchemyPooledConnectionProvider.UnableToCreateDefaultKey=[SqlAlchemyPooledConnectionProvider] Unable to create a default key for internal connection pools. By default, the user parameter is used, but the given user evaluated to None or the empty string (""). Please ensure you have passed a valid user in the connection properties.
255257

aws_advanced_python_wrapper/utils/sliding_expiration_cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def _cleanup(self):
100100
self._remove_if_expired(key)
101101

102102

103-
class SlidingExpirationCacheWithCleanupThread(SlidingExpirationCache):
103+
class SlidingExpirationCacheWithCleanupThread(SlidingExpirationCache, Generic[K, V]):
104104

105105
_executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="SlidingExpirationCacheWithCleanupThreadExecutor")
106106

0 commit comments

Comments
 (0)