Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog and this project adheres to Semantic Versioning.

## [0.1.88] - 2026-05-20

- Auto instrument subprocess module to automatically set current context as traceparent in sub-process environment whenever a new sub-process is created.
- Update `Netra.init` to automatically activate context from traceparent if traceparent is found in current environment.


## [0.1.87] - 2026-05-20

- **Prioritize input and output attributes explicitly set by user over attributes from instrumentation.**
Expand Down
25 changes: 24 additions & 1 deletion netra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Netra:
_init_lock = threading.RLock()
_root_span = None
_root_ctx_token = None
_subprocess_ctx_token = None
_metrics_enabled = False

@classmethod
Expand Down Expand Up @@ -147,6 +148,14 @@ def init(
# Initialize tracer (OTLP exporter, span processor, resource)
Tracer(cfg, root_instrument_names=resolved_root)

# Restore parent trace context when running as a subprocess.
try:
from netra.instrumentation.subprocess.utils import extract_subprocess_context

cls._subprocess_ctx_token = extract_subprocess_context()
except Exception as e:
logger.error("Failed to restore parent process trace context: %s", e)

# Initialize metrics pipeline when explicitly enabled
if cfg.enable_metrics:
try:
Expand Down Expand Up @@ -225,15 +234,29 @@ def init(

@classmethod
def shutdown(cls) -> None:
"""Flush all pending telemetry and release SDK resources."""
"""Flush all pending telemetry and release SDK resources.

Context tokens are detached in LIFO order (root first, subprocess
second) to match the attachment order in :meth:`init`. All logging is
guarded against closed streams that may occur during ``atexit``
teardown.
"""
with cls._init_lock:
# Detach in LIFO order: root was attached last, so detach it first.
if cls._root_ctx_token is not None:
try:
context_api.detach(cls._root_ctx_token)
except Exception:
pass
finally:
cls._root_ctx_token = None
if cls._subprocess_ctx_token is not None:
try:
context_api.detach(cls._subprocess_ctx_token)
except Exception:
pass
finally:
cls._subprocess_ctx_token = None
if cls._root_span is not None:
try:
cls._root_span.end()
Expand Down
22 changes: 22 additions & 0 deletions netra/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ def init_instrumentations(
if CustomInstruments.CLAUDE_AGENT_SDK in netra_custom_instruments:
init_claude_agent_sdk_instrumentation()

# Subprocess instrumentation always enabled for
# context propagation in subprocess
init_subprocess_instrumentation()


def init_groq_instrumentation() -> bool:
"""Initialize Groq instrumentation."""
Expand Down Expand Up @@ -1461,3 +1465,21 @@ def init_claude_agent_sdk_instrumentation() -> bool:
logging.error(f"Error initializing Claude Agent SDK instrumentor: {e}")
Telemetry().log_exception(e)
return False


def init_subprocess_instrumentation() -> bool:
"""Initialize subprocess context propagation instrumentation.

Returns:
bool: True if initialization was successful, False otherwise.
"""
try:
from netra.instrumentation.subprocess import NetraSubprocessInstrumentor

instrumentor = NetraSubprocessInstrumentor()
if not instrumentor.is_instrumented_by_opentelemetry:
instrumentor.instrument()
return True
except Exception as e:
logging.error(f"Error initializing subprocess instrumentor: {e}")
return False
52 changes: 52 additions & 0 deletions netra/instrumentation/subprocess/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import inspect
import logging
import subprocess
from typing import Any, Callable, Collection, Dict, Tuple

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from wrapt import wrap_function_wrapper

from netra.instrumentation.subprocess.utils import inject_subprocess_context

logger = logging.getLogger(__name__)

_instruments: tuple[()] = ()

_POPEN_INIT_SIG = inspect.signature(subprocess.Popen.__init__)


def _popen_init_wrapper(
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> Any:
"""Inject OTel trace context into the ``env`` argument of ``subprocess.Popen``.

Uses :func:`inspect.signature` to bind positional and keyword arguments so
that ``env`` is handled correctly regardless of how the caller supplied it.
"""
bound = _POPEN_INIT_SIG.bind_partial(instance, *args, **kwargs)
bound.arguments["env"] = inject_subprocess_context(bound.arguments.get("env"))
normalized_kwargs = {k: v for k, v in bound.arguments.items() if k != "self"}
return wrapped(**normalized_kwargs)


class NetraSubprocessInstrumentor(BaseInstrumentor): # type: ignore[misc]
"""Instruments subprocess.Popen to propagate OTel trace context to child processes."""

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs: Any) -> None:
try:
wrap_function_wrapper("subprocess", "Popen.__init__", _popen_init_wrapper)
except Exception as e:
logger.error("Failed to instrument subprocess: %s", e)

def _uninstrument(self, **kwargs: Any) -> None:
try:
unwrap("subprocess.Popen", "__init__")
except (AttributeError, ModuleNotFoundError) as e:
logger.error("Failed to uninstrument subprocess: %s", e)
104 changes: 104 additions & 0 deletions netra/instrumentation/subprocess/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging
import os
from typing import Any, Dict, Mapping, Optional

from opentelemetry import context as context_api
from opentelemetry import propagate, trace

logger = logging.getLogger(__name__)


def _log_traceparent(traceparent: Optional[str]) -> None:
"""Log whether a traceparent header was injected."""
if traceparent:
logger.debug("Injecting traceparent into subprocess env: %s", traceparent)
else:
logger.debug("No active span; subprocess env forwarded without traceparent.")


def inject_subprocess_context(
env: Optional[Mapping[Any, Any]] = None,
) -> Dict[Any, Any]:
"""Return a copy of *env* (or ``os.environ``) with the current OTel trace context injected.

Thread-safe: reads the calling thread's ContextVar, writes into a fresh dict.

Handles both ``str``-keyed and ``bytes``-keyed env mappings. When the
original mapping uses ``bytes`` keys the injected W3C trace headers are
encoded to ``bytes`` as well so that the subprocess env stays type-consistent.

Args:
env: The environment mapping passed to ``subprocess.Popen``. When
``None``, a copy of ``os.environ`` is used so the caller's env is
not mutated.

Returns:
A new dict containing all entries from *env* (or ``os.environ``) plus
the W3C ``traceparent`` (and ``tracestate``, if present) for the
currently active span. Returns the dict unchanged if there is no
active span.
"""
if env is None:
carrier: Dict[str, str] = dict(os.environ)
propagate.inject(carrier)
_log_traceparent(carrier.get("traceparent"))
return carrier

uses_bytes = any(isinstance(k, bytes) for k in env)

if uses_bytes:
str_carrier: Dict[str, str] = {}
propagate.inject(str_carrier)
result: Dict[Any, Any] = dict(env)
for key, value in str_carrier.items():
result[key.encode()] = value.encode()
_log_traceparent(str_carrier.get("traceparent"))
return result

carrier_copy: Dict[Any, Any] = dict(env)
propagate.inject(carrier_copy)
_log_traceparent(carrier_copy.get("traceparent"))
return carrier_copy


def extract_subprocess_context() -> Any:
"""Extract the W3C trace context from ``os.environ`` and attach it as the current context.

Intended to be called once during SDK initialisation in a child process.
Reads the ``traceparent`` (and ``tracestate``) values written by the
parent's :func:`inject_subprocess_context` and attaches the recovered
context so that all subsequent spans become children of the parent's
active span.

Returns:
An opaque OTel context token (as returned by
:func:`opentelemetry.context.attach`), or ``None`` if no
``traceparent`` was found in the environment or the extracted context
was invalid.
"""
try:
raw = os.environ.get("traceparent")
if not raw:
return None

carrier = {k.lower(): v for k, v in os.environ.items()}
ctx = propagate.extract(carrier)
span_ctx = trace.get_current_span(ctx).get_span_context()

if not span_ctx.is_valid:
logger.warning(
"Found traceparent in environment (%s) but extracted context is invalid; ignoring.",
raw,
)
return None

token = context_api.attach(ctx)
logger.debug(
"Restored parent process trace context — trace_id=%s span_id=%s",
format(span_ctx.trace_id, "032x"),
format(span_ctx.span_id, "016x"),
)
return token
except Exception as e:
logger.error("Failed to extract parent process trace context from environment: %s", e)
return None
2 changes: 1 addition & 1 deletion netra/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.87"
__version__ = "0.1.88dev"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[project]
name = "netra-sdk"
version = "0.1.87"
version = "0.1.88dev"
description = "A Python SDK for AI application observability that provides OpenTelemetry-based monitoring, tracing, and PII protection for LLM and vector database applications. Enables easy instrumentation, session tracking, and privacy-focused data collection for AI systems in production environments."
authors = [
{name = "Sooraj Thomas",email = "sooraj@keyvalue.systems"}
Expand Down