Skip to content

fix: re-entrant Collector threads on musl-based systems. #737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 40 additions & 44 deletions src/instana/collector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@
# (c) Copyright Instana Inc. 2020

"""
A Collector launches a background thread and continually collects & reports data. The data
can be any combination of metrics, snapshot data and spans.
A Collector launches a background thread and continually collects & reports data.
The data can be any combination of metrics, snapshot data and spans.
"""

import queue # pylint: disable=import-error
import threading
import time
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, List, Type

from instana.log import logger
from instana.util import DictionaryOfStan, every
from instana.util import DictionaryOfStan

if TYPE_CHECKING:
from instana.agent.base import BaseAgent
from instana.span.readable_span import ReadableSpan


class BaseCollector(object):
Expand All @@ -19,7 +25,7 @@ class BaseCollector(object):
This class launches a background thread to do this work.
"""

def __init__(self, agent):
def __init__(self, agent: Type["BaseAgent"]) -> None:
# The agent for this process. Can be Standard, AWSLambda or Fargate
self.agent = agent

Expand Down Expand Up @@ -60,7 +66,7 @@ def __init__(self, agent):
# Start time of fetching metadata
self.fetching_start_time = 0

def is_reporting_thread_running(self):
def is_reporting_thread_running(self) -> bool:
"""
Indicates if there is a thread running with the name self.THREAD_NAME
"""
Expand All @@ -69,29 +75,31 @@ def is_reporting_thread_running(self):
return True
return False

def start(self):
def start(self) -> None:
"""
Starts the collector and starts reporting as long as the agent is in a ready state.
@return: None
"""
if self.is_reporting_thread_running():
if self.thread_shutdown.is_set():
# Shutdown still in progress; Reschedule this start in 5 seconds from now
# Force a restart.
self.thread_shutdown.clear()
# Reschedule this start in 5 seconds from now
timer = threading.Timer(5, self.start)
timer.daemon = True
timer.name = "Collector Timed Start"
timer.start()
return
logger.debug(
"BaseCollector.start non-fatal: call but thread already running (started: %s)",
self.started,
f"BaseCollector.start non-fatal: call but thread already running (started: {self.started})"
)
return

if self.agent.can_send():
logger.debug("BaseCollector.start: launching collection thread")
self.thread_shutdown.clear()
self.reporting_thread = threading.Thread(target=self.thread_loop, args=())
self.reporting_thread = threading.Thread(
target=self.background_report, args=()
)
self.reporting_thread.daemon = True
self.reporting_thread.name = self.THREAD_NAME
self.reporting_thread.start()
Expand All @@ -101,7 +109,7 @@ def start(self):
"BaseCollector.start: the agent tells us we can't send anything out"
)

def shutdown(self, report_final=True):
def shutdown(self, report_final: bool = True) -> None:
"""
Shuts down the collector and reports any final data (if possible).
e.g. If the host agent disappeared, we won't be able to report final data.
Expand All @@ -113,39 +121,27 @@ def shutdown(self, report_final=True):
self.prepare_and_report_data()
self.started = False

def thread_loop(self):
"""
Just a loop that is run in the background thread.
@return: None
"""
every(
self.report_interval,
self.background_report,
"Instana Collector: prepare_and_report_data",
)

def background_report(self):
def background_report(self) -> None:
"""
The main work-horse method to report data in the background thread.
@return: Boolean
"""
if self.thread_shutdown.is_set():
logger.debug(
"Thread shutdown signal is active: Shutting down reporting thread"
)
return False

self.prepare_and_report_data()
This method runs indefinitely, preparing and reporting data at regular
intervals.
It checks for a shutdown signal and stops execution if it's set.

if self.thread_shutdown.is_set():
logger.debug(
"Thread shutdown signal is active: Shutting down reporting thread"
)
return False
@return: None
"""
while True: # pragma: no cover
if self.thread_shutdown.is_set():
logger.debug(
"Thread shutdown signal is active: Shutting down reporting thread"
)
break

return True
self.prepare_and_report_data()
time.sleep(self.report_interval)

def prepare_and_report_data(self):
def prepare_and_report_data(self) -> bool:
"""
Prepare and report the data payload.
@return: Boolean
Expand All @@ -155,26 +151,26 @@ def prepare_and_report_data(self):
self.agent.report_data_payload(payload)
return True

def prepare_payload(self):
def prepare_payload(self) -> DefaultDict[str, Any]:
"""
Method to prepare the data to be reported.
@return: DictionaryOfStan()
"""
logger.debug("BaseCollector: prepare_payload needs to be overridden")
return DictionaryOfStan()

def should_send_snapshot_data(self):
def should_send_snapshot_data(self) -> bool:
"""
Determines if snapshot data should be sent
@return: Boolean
"""
logger.debug("BaseCollector: should_send_snapshot_data needs to be overridden")
return False

def collect_snapshot(self, *argv, **kwargs):
def collect_snapshot(self, *argv, **kwargs) -> None:
logger.debug("BaseCollector: collect_snapshot needs to be overridden")

def queued_spans(self):
def queued_spans(self) -> List["ReadableSpan"]:
"""
Get all of the queued spans
@return: list
Expand All @@ -189,7 +185,7 @@ def queued_spans(self):
spans.append(span)
return spans

def queued_profiles(self):
def queued_profiles(self) -> List[Dict[str, Any]]:
"""
Get all of the queued profiles
@return: list
Expand Down
110 changes: 56 additions & 54 deletions src/instana/util/__init__.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,45 @@
# (c) Copyright IBM Corp. 2021
# (c) Copyright Instana Inc. 2020

import importlib.metadata
import json
import time
from collections import defaultdict
from typing import Any, DefaultDict
from urllib import parse

import importlib.metadata

from ..log import logger
from instana.log import logger


def nested_dictionary():
def nested_dictionary() -> DefaultDict[str, Any]:
return defaultdict(DictionaryOfStan)


# Simple implementation of a nested dictionary.
DictionaryOfStan = nested_dictionary
DictionaryOfStan: DefaultDict[str, Any] = nested_dictionary


def to_json(obj):
# Assisted by watsonx Code Assistant
def to_json(obj: Any) -> bytes:
"""
Convert obj to json. Used mostly to convert the classes in json_span.py until we switch to nested
dicts (or something better)
Convert the given object to a JSON binary string.

:param obj: the object to serialize to json
:return: json string
This function is primarily used to serialize objects from `json_span.py`
until a switch to nested dictionaries (or a better solution) is made.

:param obj: The object to serialize to JSON.
:return: The JSON string encoded as bytes.
"""
try:

def extractor(o):
def extractor(o: Any) -> dict:
"""
Extract dictionary-like attributes from an object.

:param o: The object to extract attributes from.
:return: A dictionary containing the object's attributes.
"""
if not hasattr(o, "__dict__"):
logger.debug("Couldn't serialize non dict type: %s", type(o))
logger.debug(f"Couldn't serialize non dict type: {type(o)}")
return {}
else:
return {k.lower(): v for k, v in o.__dict__.items() if v is not None}
Expand All @@ -43,9 +51,12 @@ def extractor(o):
logger.debug("to_json non-fatal encoding issue: ", exc_info=True)


def to_pretty_json(obj):
# Assisted by watsonx Code Assistant
def to_pretty_json(obj: Any) -> str:
"""
Convert obj to pretty json. Used mostly in logging/debugging.
Convert an object to a pretty-printed JSON string.

This function is primarily used for logging and debugging purposes.

:param obj: the object to serialize to json
:return: json string
Expand All @@ -66,26 +77,40 @@ def extractor(o):
logger.debug("to_pretty_json non-fatal encoding issue: ", exc_info=True)


def package_version():
# Assisted by watsonx Code Assistant
def package_version() -> str:
"""
Determine the version of this package.
Determine the version of the 'instana' package.

This function uses the `importlib.metadata` module to fetch the version of
the 'instana' package.
If the package is not found, it returns 'unknown'.

:return: String representing known version
:return: A string representing the version of the 'instana' package.
"""
version = ""
try:
version = importlib.metadata.version("instana")
except importlib.metadata.PackageNotFoundError:
logger.debug("Not able to identify the Instana package version.")
version = "unknown"

return version


def get_default_gateway():
# Assisted by watsonx Code Assistant
def get_default_gateway() -> str:
"""
Attempts to read /proc/self/net/route to determine the default gateway in use.

:return: String - the ip address of the default gateway or None if not found/possible/non-existant
This function reads the /proc/self/net/route file, which contains network
routing information for the current process.
It specifically looks for the line where the Destination is 00000000,
indicating the default route.
The Gateway IP is encoded backwards in hex, which this function decodes and
converts to a standard IP address format.

:return: String - the ip address of the default gateway or None if not
found/possible/non-existant
"""
try:
hip = None
Expand All @@ -100,52 +125,29 @@ def get_default_gateway():

if hip is not None and len(hip) == 8:
# Reverse order, convert hex to int
return "%i.%i.%i.%i" % (
int(hip[6:8], 16),
int(hip[4:6], 16),
int(hip[2:4], 16),
int(hip[0:2], 16),
)
return f"{int(hip[6:8], 16)}.{int(hip[4:6], 16)}.{int(hip[2:4], 16)}.{int(hip[0:2], 16)}"

except Exception:
logger.warning("get_default_gateway: ", exc_info=True)


def every(delay, task, name):
"""
Executes a task every `delay` seconds

:param delay: the delay in seconds
:param task: the method to run. The method should return False if you want the loop to stop.
:return: None
# Assisted by watsonx Code Assistant
def validate_url(url: str) -> bool:
"""
next_time = time.time() + delay
Validate if the provided <url> is a valid URL.

while True:
time.sleep(max(0, next_time - time.time()))
try:
if task() is False:
break
except Exception:
logger.debug(
"Problem while executing repetitive task: %s", name, exc_info=True
)

# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay


def validate_url(url):
"""
Validate if <url> is a valid url
This function checks if the given string is a valid URL by attempting to
parse it using the `urlparse` function from the `urllib.parse` module.
A URL is considered valid if it has both a scheme (like 'http' or 'https')
and a network location (netloc).

Examples:
- "http://localhost:5000" - valid
- "http://localhost:5000/path" - valid
- "sandwich" - invalid

@param url: string
@return: Boolean
@param url: A string representing the URL to validate.
@return: A boolean value. Returns `True` if the URL is valid, otherwise `False`.
"""
try:
result = parse.urlparse(url)
Expand Down
5 changes: 0 additions & 5 deletions tests/collector/test_base_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ def test_shutdown(
assert "Collector.shutdown: Reporting final data." in caplog.messages
assert not self.collector.started

def test_background_report(self) -> None:
assert self.collector.background_report()
self.collector.thread_shutdown.set()
assert not self.collector.background_report()

def test_should_send_snapshot_data(self, caplog: LogCaptureFixture) -> None:
caplog.set_level(logging.DEBUG, logger="instana")
self.collector.should_send_snapshot_data()
Expand Down
Loading