Skip to content

Commit 2142a7f

Browse files
sergiyvamzjusting-bqsergiyv-improving
authored
telemetry (#414)
Co-authored-by: justing-bq <[email protected]> Co-authored-by: sergiyv-bitquill <[email protected]>
1 parent a924568 commit 2142a7f

23 files changed

+1612
-245
lines changed

aws_advanced_python_wrapper/aws_secrets_manager_plugin.py

+42-28
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from aws_advanced_python_wrapper.utils.messages import Messages
3636
from aws_advanced_python_wrapper.utils.properties import (Properties,
3737
WrapperProperties)
38+
from aws_advanced_python_wrapper.utils.telemetry.telemetry import \
39+
TelemetryTraceLevel
3840

3941
logger = Logger(__name__)
4042

@@ -66,6 +68,9 @@ def __init__(self, plugin_service: PluginService, props: Properties, session: Op
6668
secrets_endpoint = WrapperProperties.SECRETS_MANAGER_ENDPOINT.get(props)
6769
self._secret_key: Tuple = (secret_id, region, secrets_endpoint)
6870

71+
telemetry_factory = self._plugin_service.get_telemetry_factory()
72+
self._fetch_credentials_counter = telemetry_factory.create_counter("secrets_manager.fetch_credentials.count")
73+
6974
def connect(
7075
self,
7176
target_driver_func: Callable,
@@ -116,35 +121,44 @@ def _update_secret(self, force_refetch: bool = False) -> bool:
116121
:param force_refetch: Allows ignoring cached credentials and force fetches the latest credentials from the service.
117122
:return: `True`, if credentials were fetched from the service.
118123
"""
119-
fetched: bool = False
120-
121-
self._secret: Optional[SimpleNamespace] = AwsSecretsManagerPlugin._secrets_cache.get(self._secret_key)
122-
endpoint = self._secret_key[2]
123-
124-
if not self._secret or force_refetch:
125-
try:
126-
self._secret = self._fetch_latest_credentials()
127-
if self._secret:
128-
AwsSecretsManagerPlugin._secrets_cache[self._secret_key] = self._secret
129-
fetched = True
130-
except (ClientError, AttributeError) as e:
131-
logger.debug("AwsSecretsManagerPlugin.FailedToFetchDbCredentials", e)
132-
raise AwsWrapperError(
133-
Messages.get_formatted("AwsSecretsManagerPlugin.FailedToFetchDbCredentials", e)) from e
134-
except JSONDecodeError as e:
135-
logger.debug("AwsSecretsManagerPlugin.JsonDecodeError", e)
136-
raise AwsWrapperError(
137-
Messages.get_formatted("AwsSecretsManagerPlugin.JsonDecodeError", e))
138-
except EndpointConnectionError:
139-
logger.debug("AwsSecretsManagerPlugin.EndpointOverrideInvalidConnection", endpoint)
140-
raise AwsWrapperError(
141-
Messages.get_formatted("AwsSecretsManagerPlugin.EndpointOverrideInvalidConnection", endpoint))
142-
except ValueError:
143-
logger.debug("AwsSecretsManagerPlugin.EndpointOverrideMisconfigured", endpoint)
144-
raise AwsWrapperError(
145-
Messages.get_formatted("AwsSecretsManagerPlugin.EndpointOverrideMisconfigured", endpoint))
124+
telemetry_factory = self._plugin_service.get_telemetry_factory()
125+
context = telemetry_factory.open_telemetry_context("fetch credentials", TelemetryTraceLevel.NESTED)
126+
self._fetch_credentials_counter.inc()
146127

147-
return fetched
128+
try:
129+
fetched: bool = False
130+
self._secret: Optional[SimpleNamespace] = AwsSecretsManagerPlugin._secrets_cache.get(self._secret_key)
131+
endpoint = self._secret_key[2]
132+
if not self._secret or force_refetch:
133+
try:
134+
self._secret = self._fetch_latest_credentials()
135+
if self._secret:
136+
AwsSecretsManagerPlugin._secrets_cache[self._secret_key] = self._secret
137+
fetched = True
138+
except (ClientError, AttributeError) as e:
139+
logger.debug("AwsSecretsManagerPlugin.FailedToFetchDbCredentials", e)
140+
raise AwsWrapperError(
141+
Messages.get_formatted("AwsSecretsManagerPlugin.FailedToFetchDbCredentials", e)) from e
142+
except JSONDecodeError as e:
143+
logger.debug("AwsSecretsManagerPlugin.JsonDecodeError", e)
144+
raise AwsWrapperError(
145+
Messages.get_formatted("AwsSecretsManagerPlugin.JsonDecodeError", e))
146+
except EndpointConnectionError:
147+
logger.debug("AwsSecretsManagerPlugin.EndpointOverrideInvalidConnection", endpoint)
148+
raise AwsWrapperError(
149+
Messages.get_formatted("AwsSecretsManagerPlugin.EndpointOverrideInvalidConnection", endpoint))
150+
except ValueError:
151+
logger.debug("AwsSecretsManagerPlugin.EndpointOverrideMisconfigured", endpoint)
152+
raise AwsWrapperError(
153+
Messages.get_formatted("AwsSecretsManagerPlugin.EndpointOverrideMisconfigured", endpoint))
154+
155+
return fetched
156+
except Exception as ex:
157+
context.set_success(False)
158+
context.set_exception(ex)
159+
raise ex
160+
finally:
161+
context.close_context()
148162

149163
def _fetch_latest_credentials(self):
150164
"""

aws_advanced_python_wrapper/default_plugin.py

+22-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
3434
from aws_advanced_python_wrapper.plugin import Plugin
3535
from aws_advanced_python_wrapper.utils.messages import Messages
36+
from aws_advanced_python_wrapper.utils.telemetry.telemetry import \
37+
TelemetryTraceLevel
3638

3739

3840
class DefaultPlugin(Plugin):
@@ -64,11 +66,20 @@ def _connect(
6466
host_info: HostInfo,
6567
props: Properties,
6668
conn_provider: ConnectionProvider) -> Connection:
67-
database_dialect = self._plugin_service.database_dialect
68-
conn = conn_provider.connect(target_func, driver_dialect, database_dialect, host_info, props)
69+
telemetry_factory = self._plugin_service.get_telemetry_factory()
70+
context = telemetry_factory.open_telemetry_context(driver_dialect.driver_name, TelemetryTraceLevel.NESTED)
71+
72+
conn: Connection
73+
try:
74+
database_dialect = self._plugin_service.database_dialect
75+
conn = conn_provider.connect(target_func, driver_dialect, database_dialect, host_info, props)
76+
finally:
77+
context.close_context()
78+
6979
self._plugin_service.set_availability(host_info.all_aliases, HostAvailability.AVAILABLE)
7080
self._plugin_service.update_driver_dialect(conn_provider)
7181
self._plugin_service.update_dialect(conn)
82+
7283
return conn
7384

7485
def force_connect(
@@ -88,7 +99,15 @@ def force_connect(
8899
self._connection_provider_manager.default_provider)
89100

90101
def execute(self, target: object, method_name: str, execute_func: Callable, *args: Any, **kwargs: Any) -> Any:
91-
result = self._plugin_service.driver_dialect.execute(method_name, execute_func, *args, **kwargs)
102+
telemetry_factory = self._plugin_service.get_telemetry_factory()
103+
context = telemetry_factory.open_telemetry_context(
104+
self._plugin_service.driver_dialect.driver_name, TelemetryTraceLevel.NESTED)
105+
106+
try:
107+
result = self._plugin_service.driver_dialect.execute(method_name, execute_func, *args, **kwargs)
108+
finally:
109+
context.close_context()
110+
92111
if method_name != DefaultPlugin._CLOSE_METHOD and self._plugin_service.current_connection is not None:
93112
self._plugin_service.update_in_transaction()
94113

aws_advanced_python_wrapper/failover_plugin.py

+101-37
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
WrapperProperties)
4444
from aws_advanced_python_wrapper.utils.rds_url_type import RdsUrlType
4545
from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils
46+
from aws_advanced_python_wrapper.utils.telemetry.telemetry import \
47+
TelemetryTraceLevel
4648
from aws_advanced_python_wrapper.writer_failover_handler import (
4749
WriterFailoverHandler, WriterFailoverHandlerImpl)
4850

@@ -84,7 +86,10 @@ def __init__(self, plugin_service: PluginService, props: Properties):
8486
self._properties)
8587
self._failover_reader_connect_timeout_sec = WrapperProperties.FAILOVER_READER_CONNECT_TIMEOUT_SEC.get_float(
8688
self._properties)
87-
self._keep_session_state_on_failover = WrapperProperties.KEEP_SESSION_STATE_ON_FAILOVER.get_bool(self._properties)
89+
self._keep_session_state_on_failover = WrapperProperties.KEEP_SESSION_STATE_ON_FAILOVER.get_bool(
90+
self._properties)
91+
self._telemetry_failover_additional_top_trace_setting = (
92+
WrapperProperties.TELEMETRY_FAILOVER_ADDITIONAL_TOP_TRACE.get_bool(self._properties))
8893
self._failover_mode: FailoverMode
8994
self._is_in_transaction: bool = False
9095
self._is_closed: bool = False
@@ -96,6 +101,18 @@ def __init__(self, plugin_service: PluginService, props: Properties):
96101
self._saved_read_only_status: bool = False
97102
self._saved_auto_commit_status: bool = False
98103

104+
telemetry_factory = self._plugin_service.get_telemetry_factory()
105+
self._failover_writer_triggered_counter = telemetry_factory.create_counter("writer_failover.triggered.count")
106+
self._failover_writer_success_counter = telemetry_factory.create_counter(
107+
"writer_failover.completed.success.count")
108+
self._failover_writer_failed_counter = telemetry_factory.create_counter(
109+
"writer_failover.completed.failed.count")
110+
self._failover_reader_triggered_counter = telemetry_factory.create_counter("reader_failover.triggered.count")
111+
self._failover_reader_success_counter = telemetry_factory.create_counter(
112+
"reader_failover.completed.success.count")
113+
self._failover_reader_failed_counter = telemetry_factory.create_counter(
114+
"reader_failover.completed.failed.count")
115+
99116
FailoverPlugin._SUBSCRIBED_METHODS.update(self._plugin_service.network_bound_methods)
100117

101118
def init_host_provider(
@@ -213,10 +230,13 @@ def _connect(
213230
properties: Properties,
214231
is_initial_connection: bool,
215232
connect_func: Callable) -> Connection:
216-
conn: Connection = self._stale_dns_helper.get_verified_connection(is_initial_connection, self._host_list_provider_service, host, properties,
233+
conn: Connection = self._stale_dns_helper.get_verified_connection(is_initial_connection,
234+
self._host_list_provider_service, host,
235+
properties,
217236
connect_func)
218237
if self._keep_session_state_on_failover:
219-
self._saved_read_only_status = False if self._saved_read_only_status == self._plugin_service.driver_dialect.is_read_only(conn) \
238+
self._saved_read_only_status = False if self._saved_read_only_status == self._plugin_service.driver_dialect.is_read_only(
239+
conn) \
220240
else self._saved_read_only_status
221241
self._saved_auto_commit_status = False \
222242
if self._saved_read_only_status == self._plugin_service.driver_dialect.get_autocommit(conn) \
@@ -270,53 +290,96 @@ def _failover(self, failed_host: Optional[HostInfo]):
270290
raise FailoverSuccessError(Messages.get(error_msg))
271291

272292
def _failover_reader(self, failed_host: Optional[HostInfo]):
273-
logger.debug("FailoverPlugin.StartReaderFailover")
293+
telemetry_factory = self._plugin_service.get_telemetry_factory()
294+
context = telemetry_factory.open_telemetry_context("failover to replica", TelemetryTraceLevel.NESTED)
295+
self._failover_reader_triggered_counter.inc()
274296

275-
old_aliases = None
276-
if self._plugin_service.current_host_info is not None:
277-
old_aliases = self._plugin_service.current_host_info.aliases
297+
try:
298+
logger.debug("FailoverPlugin.StartReaderFailover")
278299

279-
if failed_host is not None and failed_host.get_raw_availability() != HostAvailability.AVAILABLE:
280-
failed_host = None
300+
old_aliases = None
301+
if self._plugin_service.current_host_info is not None:
302+
old_aliases = self._plugin_service.current_host_info.aliases
281303

282-
result: ReaderFailoverResult = self._reader_failover_handler.failover(self._plugin_service.hosts, failed_host)
304+
if failed_host is not None and failed_host.get_raw_availability() != HostAvailability.AVAILABLE:
305+
failed_host = None
283306

284-
if result is None or not result.is_connected:
285-
raise FailoverFailedError(Messages.get("FailoverPlugin.UnableToConnectToReader"))
286-
else:
287-
if result.exception is not None:
288-
raise result.exception
289-
if self._keep_session_state_on_failover:
290-
self.restore_session_state(result.connection)
291-
if result.connection is not None and result.new_host is not None:
292-
self._plugin_service.set_current_connection(result.connection, result.new_host)
307+
result: ReaderFailoverResult = self._reader_failover_handler.failover(self._plugin_service.hosts,
308+
failed_host)
293309

294-
if self._plugin_service.current_host_info is not None and old_aliases is not None and len(old_aliases) > 0:
295-
self._plugin_service.current_host_info.remove_alias(old_aliases)
310+
if result is None or not result.is_connected:
311+
raise FailoverFailedError(Messages.get("FailoverPlugin.UnableToConnectToReader"))
312+
else:
313+
if result.exception is not None:
314+
raise result.exception
315+
if self._keep_session_state_on_failover:
316+
self.restore_session_state(result.connection)
317+
if result.connection is not None and result.new_host is not None:
318+
self._plugin_service.set_current_connection(result.connection, result.new_host)
319+
320+
if self._plugin_service.current_host_info is not None and old_aliases is not None and len(old_aliases) > 0:
321+
self._plugin_service.current_host_info.remove_alias(old_aliases)
322+
323+
self._update_topology(True)
324+
325+
logger.debug("FailoverPlugin.EstablishedConnection", self._plugin_service.current_host_info)
326+
327+
self._failover_reader_success_counter.inc()
328+
except FailoverSuccessError as fse:
329+
context.set_success(True)
330+
context.set_exception(fse)
331+
self._failover_reader_success_counter.inc()
332+
raise fse
333+
except Exception as ex:
334+
context.set_success(False)
335+
context.set_exception(ex)
336+
self._failover_reader_failed_counter.inc()
337+
raise ex
338+
finally:
339+
context.close_context()
340+
if self._telemetry_failover_additional_top_trace_setting:
341+
telemetry_factory.post_copy(context, TelemetryTraceLevel.FORCE_TOP_LEVEL)
296342

297-
self._update_topology(True)
343+
def _failover_writer(self):
344+
telemetry_factory = self._plugin_service.get_telemetry_factory()
345+
context = telemetry_factory.open_telemetry_context("failover to writer node", TelemetryTraceLevel.NESTED)
346+
self._failover_writer_triggered_counter.inc()
298347

299-
logger.debug("FailoverPlugin.EstablishedConnection", self._plugin_service.current_host_info)
348+
try:
349+
logger.debug("FailoverPlugin.StartWriterFailover")
300350

301-
def _failover_writer(self):
302-
logger.debug("FailoverPlugin.StartWriterFailover")
351+
result: WriterFailoverResult = self._writer_failover_handler.failover(self._plugin_service.hosts)
303352

304-
result: WriterFailoverResult = self._writer_failover_handler.failover(self._plugin_service.hosts)
353+
if result is not None and result.exception is not None:
354+
raise result.exception
355+
elif result is None or not result.is_connected:
356+
raise FailoverFailedError(Messages.get("FailoverPlugin.UnableToConnectToWriter"))
305357

306-
if result is not None and result.exception is not None:
307-
raise result.exception
308-
elif result is None or not result.is_connected:
309-
raise FailoverFailedError(Messages.get("FailoverPlugin.UnableToConnectToWriter"))
358+
writer_host = self._get_writer(result.topology)
359+
if self._keep_session_state_on_failover:
360+
self.restore_session_state(result.new_connection)
310361

311-
writer_host = self._get_writer(result.topology)
312-
if self._keep_session_state_on_failover:
313-
self.restore_session_state(result.new_connection)
362+
self._plugin_service.set_current_connection(result.new_connection, writer_host)
314363

315-
self._plugin_service.set_current_connection(result.new_connection, writer_host)
364+
logger.debug("FailoverPlugin.EstablishedConnection", self._plugin_service.current_host_info)
316365

317-
logger.debug("FailoverPlugin.EstablishedConnection", self._plugin_service.current_host_info)
366+
self._plugin_service.refresh_host_list()
318367

319-
self._plugin_service.refresh_host_list()
368+
self._failover_writer_success_counter.inc()
369+
except FailoverSuccessError as fse:
370+
context.set_success(True)
371+
context.set_exception(fse)
372+
self._failover_writer_success_counter.inc()
373+
raise fse
374+
except Exception as ex:
375+
context.set_success(False)
376+
context.set_exception(ex)
377+
self._failover_writer_failed_counter.inc()
378+
raise ex
379+
finally:
380+
context.close_context()
381+
if self._telemetry_failover_additional_top_trace_setting:
382+
telemetry_factory.post_copy(context, TelemetryTraceLevel.FORCE_TOP_LEVEL)
320383

321384
def restore_session_state(self, conn: Optional[Connection]):
322385
"""
@@ -401,7 +464,8 @@ def _connect_to(self, host: HostInfo):
401464
logger.debug("FailoverPlugin.EstablishedConnection", host)
402465
except Exception as ex:
403466
if self._plugin_service is not None:
404-
logger.debug("FailoverPlugin.ConnectionToHostFailed", 'writer' if host.role == HostRole.WRITER else 'reader', host.url)
467+
logger.debug("FailoverPlugin.ConnectionToHostFailed",
468+
'writer' if host.role == HostRole.WRITER else 'reader', host.url)
405469
raise ex
406470

407471
def _should_attempt_reader_connection(self) -> bool:

0 commit comments

Comments
 (0)