Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dynatrace_extension/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# SPDX-License-Identifier: MIT


__version__ = "1.7.2"
__version__ = "1.7.3"
13 changes: 1 addition & 12 deletions dynatrace_extension/sdk/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
self.interval: timedelta = interval
self.logger = logger
self.running: bool = False
self.status = Status(StatusValue.OK)
self.status = IgnoreStatus()
self.executions_total = 0 # global counter
self.executions_per_interval = 0 # counter per interval = 1 min by default
self.duration = 0 # global counter
Expand All @@ -45,7 +45,6 @@ def __init__(
self.ok_count = 0 # counter per interval = 1 min by default
self.timeouts_count = 0 # counter per interval = 1 min by default
self.exception_count = 0 # counter per interval = 1 min by default
self.iterations = 0 # how many times we ran the callback iterator for this callback

def get_current_time_with_cluster_diff(self):
return datetime.now() + timedelta(milliseconds=self.cluster_time_diff)
Expand Down Expand Up @@ -142,13 +141,3 @@ def clear_sfm_metrics(self):
self.duration_interval_total = 0
self.exception_count = 0
self.executions_per_interval = 0

def get_next_execution_timestamp(self) -> float:
"""
Get the timestamp for the next execution of the callback
This is done using execution total, the interval and the start timestamp
:return: datetime
"""
return (
self.start_timestamp + timedelta(seconds=self.interval.total_seconds() * (self.iterations or 1))
).timestamp()
139 changes: 75 additions & 64 deletions dynatrace_extension/sdk/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
# SPDX-License-Identifier: MIT

import logging
import sched
import signal
import sys
import threading
import time
from argparse import ArgumentParser
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from enum import Enum
from itertools import chain
from pathlib import Path
from threading import Lock, RLock, active_count
from typing import Any, ClassVar, NamedTuple

from apscheduler.executors.pool import ThreadPoolExecutor # type: ignore
from apscheduler.schedulers.background import BackgroundScheduler # type: ignore
from apscheduler.triggers.interval import IntervalTrigger # type: ignore

from .activation import ActivationConfig, ActivationType
from .callback import WrappedCallback
from .communication import CommunicationClient, DebugClient, HttpClient
Expand Down Expand Up @@ -172,6 +174,12 @@ def _add_sfm_metric(metric: Metric, sfm_metrics: list[Metric] | None = None):
sfm_metrics.append(metric)


class ExecutorType(str, Enum):
CALLBACKS = "callbacks"
INTERNAL = "internal"
HEARTBEAT = "heartbeat"


class Extension:
"""Base class for Python extensions.

Expand Down Expand Up @@ -240,21 +248,14 @@ def __init__(self, name: str = "") -> None:
self._running_callbacks: dict[int, WrappedCallback] = {}
self._running_callbacks_lock: Lock = Lock()

self._scheduler = sched.scheduler(time.time, time.sleep)

# Timestamps for scheduling of internal callbacks
self._next_internal_callbacks_timestamps: dict[str, datetime] = {
"timediff": datetime.now() + TIME_DIFF_INTERVAL,
"heartbeat": datetime.now() + HEARTBEAT_INTERVAL,
"metrics": datetime.now() + METRIC_SENDING_INTERVAL,
"events": datetime.now() + METRIC_SENDING_INTERVAL,
"sfm_metrics": datetime.now() + SFM_METRIC_SENDING_INTERVAL,
}

# Executors for the callbacks and internal methods
self._callbacks_executor = ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE)
self._internal_executor = ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE)
self._heartbeat_executor = ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE)
# Scheduler and executors for the callbacks and internal methods
self._scheduler = BackgroundScheduler(
executors={
ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE),
ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE),
ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE),
}
)

# Extension metrics
self._metrics_lock = RLock()
Expand Down Expand Up @@ -376,7 +377,14 @@ def _schedule_callback(self, callback: WrappedCallback):
callback.cluster_time_diff = self._cluster_time_diff
callback.running_in_sim = self._running_in_sim
self._scheduled_callbacks.append(callback)
self._scheduler.enter(callback.initial_wait_time(), 1, self._callback_iteration, (callback,))

self._scheduler.add_job(
self._run_callback,
args=[callback],
executor=ExecutorType.CALLBACKS,
trigger=IntervalTrigger(seconds=callback.interval.total_seconds()),
next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time()),
)

def schedule(
self,
Expand Down Expand Up @@ -809,7 +817,15 @@ def _parse_args(self):

if not self._is_fastcheck:
try:
self._heartbeat_iteration()
# TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler,
# which would starve heartbeat if any callback took too long
# On the other hand, those callbacks inserted specific potentially risky jobs to different executors, so it should be okay?
# Why did heartbeat have a different priority (higher or lower?)
self._scheduler.add_job(
self._heartbeat,
executor=ExecutorType.HEARTBEAT,
trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds()),
)
self.initialize()
if not self.is_helper:
self.schedule(self.query, timedelta(minutes=1))
Expand Down Expand Up @@ -863,48 +879,48 @@ def _run_callback(self, callback: WrappedCallback):
with self._running_callbacks_lock:
self._running_callbacks.pop(current_thread_id, None)

def _callback_iteration(self, callback: WrappedCallback):
self._callbacks_executor.submit(self._run_callback, callback)
callback.iterations += 1
next_timestamp = callback.get_next_execution_timestamp()
self._scheduler.enterabs(next_timestamp, 1, self._callback_iteration, (callback,))

def _start_extension_loop(self):
api_logger.debug(f"Starting main loop for monitoring configuration: '{self.monitoring_config_name}'")

# These were scheduled before the extension started, schedule them now
for callback in self._scheduled_callbacks_before_run:
self._schedule_callback(callback)
self._metrics_iteration()
self._events_iteration()
self._sfm_metrics_iteration()
self._timediff_iteration()
self._scheduler.run()

def _timediff_iteration(self):
self._internal_executor.submit(self._update_cluster_time_diff)
next_timestamp = self._get_and_set_next_internal_callback_timestamp("timediff", TIME_DIFF_INTERVAL)
self._scheduler.enterabs(next_timestamp, 1, self._timediff_iteration)

def _heartbeat_iteration(self):
self._heartbeat_executor.submit(self._heartbeat)
next_timestamp = self._get_and_set_next_internal_callback_timestamp("heartbeat", HEARTBEAT_INTERVAL)
self._scheduler.enterabs(next_timestamp, 2, self._heartbeat_iteration)

def _metrics_iteration(self):
self._internal_executor.submit(self._send_metrics)
next_timestamp = self._get_and_set_next_internal_callback_timestamp("metrics", METRIC_SENDING_INTERVAL)
self._scheduler.enterabs(next_timestamp, 1, self._metrics_iteration)

def _events_iteration(self):
self._internal_executor.submit(self._send_buffered_events)
next_timestamp = self._get_and_set_next_internal_callback_timestamp("events", METRIC_SENDING_INTERVAL)
self._scheduler.enterabs(next_timestamp, 1, self._events_iteration)

def _sfm_metrics_iteration(self):
self._internal_executor.submit(self._send_sfm_metrics)
next_timestamp = self._get_and_set_next_internal_callback_timestamp("sfm_metrics", SFM_METRIC_SENDING_INTERVAL)
self._scheduler.enterabs(next_timestamp, 1, self._sfm_metrics_iteration)

self._scheduler.add_job(
self._send_metrics,
executor=ExecutorType.INTERNAL,
trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()),
next_run_time=datetime.now(),
)

self._scheduler.add_job(
self._send_buffered_events,
executor=ExecutorType.INTERNAL,
trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()),
next_run_time=datetime.now(),
)

self._scheduler.add_job(
self._send_sfm_metrics,
executor=ExecutorType.INTERNAL,
trigger=IntervalTrigger(seconds=SFM_METRIC_SENDING_INTERVAL.total_seconds()),
next_run_time=datetime.now(),
)

self._scheduler.add_job(
self._update_cluster_time_diff,
executor=ExecutorType.INTERNAL,
trigger=IntervalTrigger(seconds=TIME_DIFF_INTERVAL.total_seconds()),
next_run_time=datetime.now(),
)

self._scheduler.start()

try:
while self._scheduler.running:
time.sleep(1)
except Exception:
self._scheduler.shutdown()

def _send_metrics(self):
with self._metrics_lock:
Expand Down Expand Up @@ -1105,8 +1121,8 @@ def _heartbeat(self):
api_logger.error(f"Heartbeat failed because {e}, response {response}", exc_info=True)

def __del__(self):
self._callbacks_executor.shutdown()
self._internal_executor.shutdown()
if self._scheduler.running:
self._scheduler.shutdown()

def _add_metric(self, metric: Metric):
metric.validate()
Expand Down Expand Up @@ -1150,7 +1166,7 @@ def _send_events_internal(self, events: dict | list[dict]):

def _send_events(self, events: dict | list[dict], send_immediately: bool = False):
if send_immediately:
self._internal_executor.submit(self._send_events_internal, events)
self._scheduler.add_job(self._send_events_internal, args=[events], executor=ExecutorType.INTERNAL)
return
with self._logs_lock:
if isinstance(events, dict):
Expand All @@ -1169,11 +1185,6 @@ def _send_buffered_events(self):
def _send_dt_event(self, event: dict[str, str | int | dict[str, str]]):
self._client.send_dt_event(event)

def _get_and_set_next_internal_callback_timestamp(self, callback_name: str, interval: timedelta):
next_timestamp = self._next_internal_callbacks_timestamps[callback_name]
self._next_internal_callbacks_timestamps[callback_name] += interval
return next_timestamp.timestamp()

def get_version(self) -> str:
"""Return the extension version."""
return self.activation_config.version
Expand Down Expand Up @@ -1247,4 +1258,4 @@ def _send_sfm_logs(self, logs: dict | list[dict]):
log["dt.extension.config.label"] = self.monitoring_config_name
log.pop("monitoring.configuration", None)

self._internal_executor.submit(self._send_sfm_logs_internal, logs)
self._scheduler.add_job(self._send_sfm_logs_internal, args=[logs], executor=ExecutorType.INTERNAL)
5 changes: 3 additions & 2 deletions dynatrace_extension/sdk/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__(self, send_sfm_logs_function: Callable) -> None:
self._ep_records: dict[str, EndpointStatusRecord] = {}
self._send_sfm_logs_function = send_sfm_logs_function
self._logs_to_send: list[str] = []
self._datetime_now = datetime.now # Mockable datetime function

def contains_any_status(self) -> bool:
return len(self._ep_records) > 0
Expand Down Expand Up @@ -190,7 +191,7 @@ def send_ep_logs(self):
ep_record.ep_status.message,
)
)
ep_record.last_sent = datetime.now()
ep_record.last_sent = self._datetime_now()
ep_record.state = StatusState.ONGOING

if logs_to_send:
Expand All @@ -202,7 +203,7 @@ def _should_be_reported(self, ep_record: EndpointStatusRecord):
elif ep_record.state in (StatusState.INITIAL, StatusState.NEW):
return True
elif ep_record.state == StatusState.ONGOING and (
ep_record.last_sent is None or datetime.now() - ep_record.last_sent >= self.RESENDING_INTERVAL
ep_record.last_sent is None or self._datetime_now() - ep_record.last_sent >= self.RESENDING_INTERVAL
):
return True
else:
Expand Down
9 changes: 4 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = []
dependencies = ["apscheduler"]

[project.optional-dependencies]
cli = [ "dt-cli>=1.6.13", "typer[all]", "pyyaml", "ruff"]
Expand All @@ -47,8 +47,7 @@ dependencies = [
"pytest",
"typer[all]",
"pyyaml",
"dt-cli>=1.6.13",
"freezegun"
"dt-cli>=1.6.13"
]

[tool.hatch.envs.default.scripts]
Expand All @@ -74,8 +73,8 @@ dependencies = [
"ruff>=0.9.10",
"typer[all]",
"pyyaml",
"pytest",
"freezegun"
"pytest",
"apscheduler"
]

[tool.hatch.envs.lint.scripts]
Expand Down
Loading