34
34
WrapperProperties )
35
35
from aws_advanced_python_wrapper .utils .sliding_expiration_cache import \
36
36
SlidingExpirationCacheWithCleanupThread
37
+ from aws_advanced_python_wrapper .utils .telemetry .telemetry import (
38
+ TelemetryContext , TelemetryCounter , TelemetryFactory , TelemetryGauge ,
39
+ TelemetryTraceLevel )
37
40
38
41
if TYPE_CHECKING :
39
42
from aws_advanced_python_wrapper .driver_dialect import DriverDialect
@@ -160,6 +163,7 @@ def __init__(self, plugin_service: PluginService, host_info: HostInfo, props: Pr
160
163
self ._properties = props
161
164
self ._interval_ms = interval_ms
162
165
166
+ self ._telemetry_factory = self ._plugin_service .get_telemetry_factory ()
163
167
self ._response_time : int = 0
164
168
self ._check_timestamp = datetime .now ()
165
169
self ._lock : Lock = Lock ()
@@ -172,6 +176,11 @@ def __init__(self, plugin_service: PluginService, host_info: HostInfo, props: Pr
172
176
173
177
self ._executor : Executor = ThreadPoolExecutor (thread_name_prefix = "HostResponseTimeMonitorExecutor" )
174
178
179
+ # Report current response time (in milliseconds) to telemetry engine.
180
+ # Report -1 if response time couldn't be measured.
181
+ self ._response_time_gauge = self ._telemetry_factory .create_gauge ("frt.response.time." + self ._host_id ,
182
+ lambda : self ._response_time if self ._response_time != 2 ^ 31 else - 1 )
183
+
175
184
@property
176
185
def response_time (self ):
177
186
return self ._response_time
@@ -206,6 +215,9 @@ def _get_current_time(self):
206
215
return time ()
207
216
208
217
def run (self ):
218
+ context = self ._telemetry_factory .open_telemetry_context (
219
+ "node response time thread" , TelemetryTraceLevel .TOP_LEVEL )
220
+ context .set_attribute ("url" , self ._host_info .url )
209
221
try :
210
222
while not self .is_stopped :
211
223
try :
@@ -249,6 +261,8 @@ def run(self):
249
261
except Exception :
250
262
# Do nothing
251
263
pass
264
+ if context is not None :
265
+ context .close_context ()
252
266
253
267
def _open_connection (self ):
254
268
try :
@@ -282,8 +296,8 @@ def get_response_time(self, host_info: HostInfo) -> int:
282
296
Return a response time in milliseconds to the host.
283
297
Return 2 ^ 31 if response time is not available.
284
298
285
- @ param hostSpec the host details
286
- @ return response time in milliseconds for a desired host. It should return 2 ^ 31
299
+ : param host_info: the host details
300
+ : return: response time in milliseconds for a desired host. It should return 2 ^ 31
287
301
if response time couldn't be measured.
288
302
"""
289
303
...
@@ -307,6 +321,8 @@ def __init__(self, plugin_service: PluginService, props: Properties, interval_ms
307
321
self ._properties = props
308
322
self ._interval_ms = interval_ms
309
323
self ._hosts : Tuple [HostInfo , ...] = ()
324
+ self ._telemetry_factory = self ._plugin_service .get_telemetry_factory ()
325
+ self ._node_count_gauge = self ._telemetry_factory .create_gauge ("frt.nodes.count" , lambda : len (self ._monitoring_nodes ))
310
326
311
327
def get_response_time (self , host_info : HostInfo ) -> int :
312
328
monitor : Optional [HostResponseTimeMonitor ] = HostResponseTimeServiceImpl ._monitoring_nodes .get (host_info .url )
0 commit comments