Skip to content

refactor(tracing): break span-sampling cycle #13061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
from ddtrace._trace.span import _get_64_highest_order_bits_as_hex
from ddtrace.constants import _APM_ENABLED_METRIC_KEY as MK_APM_ENABLED
from ddtrace.constants import _SAMPLING_PRIORITY_KEY
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MECHANISM
from ddtrace.constants import USER_KEEP
from ddtrace.internal import gitmetadata
from ddtrace.internal import telemetry
from ddtrace.internal.constants import HIGHER_ORDER_TRACE_ID_BITS
from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY
from ddtrace.internal.constants import MAX_UINT_64BITS
from ddtrace.internal.constants import SamplingMechanism
from ddtrace.internal.dogstatsd import get_dogstatsd_client
from ddtrace.internal.logger import get_logger
from ddtrace.internal.sampling import SpanSamplingRule
from ddtrace.internal.sampling import get_span_sampling_rules
from ddtrace.internal.sampling import is_single_span_sampled
from ddtrace.internal.serverless import has_aws_lambda_agent_extension
from ddtrace.internal.serverless import in_aws_lambda
from ddtrace.internal.serverless import in_azure_function
Expand Down Expand Up @@ -161,7 +162,12 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if priority is not None and priority <= 0:
# When any span is marked as keep by a single span sampling
# decision then we still send all and only those spans.
single_spans = [_ for _ in trace if is_single_span_sampled(_)]

single_spans = [
_
for _ in trace
if _.get_metric(_SINGLE_SPAN_SAMPLING_MECHANISM) == SamplingMechanism.SPAN_SAMPLING_RULE
]

return single_spans or None

Expand All @@ -170,8 +176,8 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
for span in trace:
if span.context.sampling_priority is not None and span.context.sampling_priority <= 0:
for rule in self.single_span_rules:
if rule.match(span):
rule.sample(span)
if rule.match(span.name, span.service):
span.sample(rule)
# If stats computation is enabled, we won't send all spans to the agent.
# In order to ensure that the agent does not update priority sampling rates
# due to single spans sampling, we set all of these spans to manual keep.
Expand Down
16 changes: 11 additions & 5 deletions ddtrace/_trace/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from ..internal.constants import SamplingMechanism
from ..internal.logger import get_logger
from ..internal.rate_limiter import RateLimiter
from ..internal.sampling import _get_highest_precedence_rule_matching
from ..internal.sampling import _set_sampling_tags
from .sampling_rule import SamplingRule


Expand Down Expand Up @@ -168,9 +166,18 @@ def set_sampling_rules(self, rules: str) -> None:
# Sort the sampling_rules list using a lambda function as the key
self.rules = sorted(sampling_rules, key=lambda rule: PROVENANCE_ORDER.index(rule.provenance))

def _get_highest_precedence_rule_matching(self, span: Span) -> Optional[SamplingRule]:
if not (rules := self.rules):
return None

for rule in rules:
if span.matches(rule):
return rule
return None

def sample(self, span: Span) -> bool:
span._update_tags_from_context()
matched_rule = _get_highest_precedence_rule_matching(span, self.rules)
matched_rule = self._get_highest_precedence_rule_matching(span)
# Default sampling
agent_service_based = False
sampled = True
Expand All @@ -196,8 +203,7 @@ def sample(self, span: Span) -> bool:
span.set_metric(_SAMPLING_LIMIT_DECISION, self.limiter.effective_rate)

sampling_mechanism = self._get_sampling_mechanism(matched_rule, agent_service_based)
_set_sampling_tags(
span,
span._set_sampling_tags(
sampled,
sample_rate,
sampling_mechanism,
Expand Down
23 changes: 0 additions & 23 deletions ddtrace/_trace/sampling_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
from ddtrace.internal.utils.cache import cachedmethod


if TYPE_CHECKING: # pragma: no cover
from ddtrace._trace.span import Span # noqa:F401

log = get_logger(__name__)


Expand Down Expand Up @@ -126,26 +123,6 @@ def _matches(self, key: Tuple[Optional[str], str, Optional[str]]) -> bool:
else:
return True

def matches(self, span):
# type: (Span) -> bool
"""
Return if this span matches this rule

:param span: The span to match against
:type span: :class:`ddtrace._trace.span.Span`
:returns: Whether this span matches or not
:rtype: :obj:`bool`
"""
tags_match = self.tags_match(span)
return tags_match and self._matches((span.service, span.name, span.resource))

def tags_match(self, span):
# type: (Span) -> bool
tag_match = True
if self._tag_value_matchers:
tag_match = self.check_tags(span.get_tags(), span.get_metrics())
return tag_match

def check_tags(self, meta, metrics):
if meta is None and metrics is None:
return False
Expand Down
60 changes: 59 additions & 1 deletion ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ddtrace._trace._span_pointer import _SpanPointer
from ddtrace._trace._span_pointer import _SpanPointerDirection
from ddtrace._trace.context import Context
from ddtrace._trace.sampling_rule import SamplingRule
from ddtrace._trace.types import _AttributeValueType
from ddtrace._trace.types import _MetaDictType
from ddtrace._trace.types import _MetricDictType
Expand All @@ -29,6 +30,10 @@
from ddtrace.constants import _SAMPLING_AGENT_DECISION
from ddtrace.constants import _SAMPLING_LIMIT_DECISION
from ddtrace.constants import _SAMPLING_RULE_DECISION
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MAX_PER_SEC
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MAX_PER_SEC_NO_LIMIT
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MECHANISM
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_RATE
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import ERROR_MSG
from ddtrace.constants import ERROR_STACK
Expand All @@ -48,10 +53,14 @@
from ddtrace.internal.compat import NumericType
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.compat import is_integer
from ddtrace.internal.constants import _KEEP_PRIORITY_INDEX
from ddtrace.internal.constants import _REJECT_PRIORITY_INDEX
from ddtrace.internal.constants import MAX_UINT_64BITS as _MAX_UINT_64BITS
from ddtrace.internal.constants import SAMPLING_MECHANISM_TO_PRIORITIES
from ddtrace.internal.constants import SPAN_API_DATADOG
from ddtrace.internal.constants import SamplingMechanism
from ddtrace.internal.logger import get_logger
from ddtrace.internal.sampling import SamplingMechanism
from ddtrace.internal.sampling import SpanSamplingRule
from ddtrace.internal.sampling import set_sampling_decision_maker
from ddtrace.settings._config import config

Expand Down Expand Up @@ -334,6 +343,55 @@ def _override_sampling_decision(self, decision: Optional[NumericType]):
if key in self._local_root._metrics:
del self._local_root._metrics[key]

def tags_match(self, rule: SamplingRule) -> bool:
tag_match = True
if rule._tag_value_matchers:
tag_match = rule.check_tags(self.get_tags(), self.get_metrics())
return tag_match

def matches(self, rule: SamplingRule) -> bool:
"""
Return if this span matches this rule

:param span: The span to match against
:type span: :class:`ddtrace._trace.span.Span`
:returns: Whether this span matches or not
:rtype: :obj:`bool`
"""
tags_match = self.tags_match(rule)
return tags_match and rule._matches((self.service, self.name, self.resource))

def apply_span_sampling_tags(self, rule: SpanSamplingRule) -> None:
self.set_metric(_SINGLE_SPAN_SAMPLING_MECHANISM, SamplingMechanism.SPAN_SAMPLING_RULE)
self.set_metric(_SINGLE_SPAN_SAMPLING_RATE, rule._sample_rate)
# Only set this tag if it's not the default -1
if rule._max_per_second != _SINGLE_SPAN_SAMPLING_MAX_PER_SEC_NO_LIMIT:
self.set_metric(_SINGLE_SPAN_SAMPLING_MAX_PER_SEC, rule._max_per_second)

def sample(self, rule: SpanSamplingRule) -> bool:
if rule._sample(self.span_id):
if rule._limiter.is_allowed():
self.apply_span_sampling_tags(rule)
return True
return False

def _set_sampling_tags(self, sampled: bool, sample_rate: float, mechanism: int) -> None:
# Set the sampling mechanism
set_sampling_decision_maker(self.context, mechanism)
# Set the sampling psr rate
if mechanism in (
SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE,
SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE,
SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE,
):
self.set_metric(_SAMPLING_RULE_DECISION, sample_rate)
elif mechanism == SamplingMechanism.AGENT_RATE_BY_SERVICE:
self.set_metric(_SAMPLING_AGENT_DECISION, sample_rate)
# Set the sampling priority
priorities = SAMPLING_MECHANISM_TO_PRIORITIES[mechanism]
priority_index = _KEEP_PRIORITY_INDEX if sampled else _REJECT_PRIORITY_INDEX
self.context.sampling_priority = priorities[priority_index]

def set_tag(self, key: _TagNameType, value: Any = None) -> None:
"""Set a tag key/value pair on the span.

Expand Down
4 changes: 2 additions & 2 deletions ddtrace/appsec/_trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def _asm_manual_keep(span: Span) -> None:
from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY
from ddtrace.internal.sampling import SamplingMechanism
from ddtrace.internal.constants import SamplingMechanism

span.set_tag(constants.MANUAL_KEEP_KEY)
# set decision maker to ASM = -5
Expand Down Expand Up @@ -295,7 +295,7 @@ def block_request() -> None:
meaning that if you capture the exception the request blocking could not work.
"""
if not asm_config._asm_enabled:
log.warning("block_request() is disabled. To use this feature please enable" "Application Security Monitoring")
log.warning("block_request() is disabled. To use this feature please enableApplication Security Monitoring")
return

_asm_request_context.block_request()
Expand Down
3 changes: 1 addition & 2 deletions ddtrace/internal/ci_visibility/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ddtrace.ext import SpanTypes
from ddtrace.ext import ci
from ddtrace.internal.constants import SamplingMechanism
from ddtrace.internal.sampling import _set_sampling_tags
from ddtrace.trace import TraceFilter


Expand All @@ -32,7 +31,7 @@ def process_trace(self, trace):
return None

local_root.context.dd_origin = ci.CI_APP_TEST_ORIGIN
_set_sampling_tags(local_root, True, 1.0, SamplingMechanism.DEFAULT)
local_root._set_sampling_tags(True, 1.0, SamplingMechanism.DEFAULT)
for span in trace:
span.set_tags(self._tags)
span.set_tag_str(ci.LIBRARY_VERSION, ddtrace.__version__)
Expand Down
75 changes: 3 additions & 72 deletions ddtrace/internal/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,11 @@
except ImportError:
from typing_extensions import TypedDict

from ddtrace._trace.sampling_rule import SamplingRule # noqa:F401
from ddtrace.constants import _SAMPLING_AGENT_DECISION
from ddtrace.constants import _SAMPLING_RULE_DECISION
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MAX_PER_SEC
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MAX_PER_SEC_NO_LIMIT
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MECHANISM
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_RATE
from ddtrace.internal.constants import _KEEP_PRIORITY_INDEX
from ddtrace.internal.constants import _REJECT_PRIORITY_INDEX
from ddtrace.internal.constants import MAX_UINT_64BITS
from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY
from ddtrace.internal.constants import SAMPLING_HASH_MODULO
from ddtrace.internal.constants import SAMPLING_KNUTH_FACTOR
from ddtrace.internal.constants import SAMPLING_MECHANISM_TO_PRIORITIES
from ddtrace.internal.constants import SamplingMechanism
from ddtrace.internal.glob_matching import GlobMatcher
from ddtrace.internal.logger import get_logger
from ddtrace.settings._config import config
Expand All @@ -46,7 +36,6 @@

if TYPE_CHECKING: # pragma: no cover
from ddtrace._trace.context import Context # noqa:F401
from ddtrace._trace.span import Span # noqa:F401


class PriorityCategory(object):
Expand Down Expand Up @@ -125,28 +114,16 @@ def __init__(
self._service_matcher = GlobMatcher(service) if service is not None else None
self._name_matcher = GlobMatcher(name) if name is not None else None

def sample(self, span):
# type: (Span) -> bool
if self._sample(span):
if self._limiter.is_allowed():
self.apply_span_sampling_tags(span)
return True
return False

def _sample(self, span):
# type: (Span) -> bool
def _sample(self, span_id: int) -> bool:
if self._sample_rate == 1:
return True
elif self._sample_rate == 0:
return False

return ((span.span_id * SAMPLING_KNUTH_FACTOR) % SAMPLING_HASH_MODULO) <= self._sampling_id_threshold
return ((span_id * SAMPLING_KNUTH_FACTOR) % SAMPLING_HASH_MODULO) <= self._sampling_id_threshold

def match(self, span):
# type: (Span) -> bool
def match(self, name: Optional[str], service: Optional[str]) -> bool:
"""Determines if the span's service and name match the configured patterns"""
name = span.name
service = span.service
# If a span lacks a name and service, we can't match on it
if service is None and name is None:
return False
Expand All @@ -168,14 +145,6 @@ def match(self, span):
name_match = self._name_matcher.match(name)
return service_match and name_match

def apply_span_sampling_tags(self, span):
# type: (Span) -> None
span.set_metric(_SINGLE_SPAN_SAMPLING_MECHANISM, SamplingMechanism.SPAN_SAMPLING_RULE)
span.set_metric(_SINGLE_SPAN_SAMPLING_RATE, self._sample_rate)
# Only set this tag if it's not the default -1
if self._max_per_second != _SINGLE_SPAN_SAMPLING_MAX_PER_SEC_NO_LIMIT:
span.set_metric(_SINGLE_SPAN_SAMPLING_MAX_PER_SEC, self._max_per_second)


def get_span_sampling_rules() -> List[SpanSamplingRule]:
json_rules = _get_span_sampling_json()
Expand Down Expand Up @@ -257,41 +226,3 @@ def _check_unsupported_pattern(string: str) -> None:
for char in string:
if char in unsupported_chars and config._raise:
raise ValueError("Unsupported Glob pattern found, character:%r is not supported" % char)


def is_single_span_sampled(span):
# type: (Span) -> bool
return span.get_metric(_SINGLE_SPAN_SAMPLING_MECHANISM) == SamplingMechanism.SPAN_SAMPLING_RULE


def _set_sampling_tags(span, sampled, sample_rate, mechanism):
# type: (Span, bool, float, int) -> None
# Set the sampling mechanism once but never overwrite an existing tag
if not span.context._meta.get(SAMPLING_DECISION_TRACE_TAG_KEY):
set_sampling_decision_maker(span.context, mechanism)

# Set the sampling psr rate
if mechanism in (
SamplingMechanism.LOCAL_USER_TRACE_SAMPLING_RULE,
SamplingMechanism.REMOTE_USER_TRACE_SAMPLING_RULE,
SamplingMechanism.REMOTE_DYNAMIC_TRACE_SAMPLING_RULE,
):
span.set_metric(_SAMPLING_RULE_DECISION, sample_rate)
elif mechanism == SamplingMechanism.AGENT_RATE_BY_SERVICE:
span.set_metric(_SAMPLING_AGENT_DECISION, sample_rate)
# Set the sampling priority
priorities = SAMPLING_MECHANISM_TO_PRIORITIES[mechanism]
priority_index = _KEEP_PRIORITY_INDEX if sampled else _REJECT_PRIORITY_INDEX

span.context.sampling_priority = priorities[priority_index]


def _get_highest_precedence_rule_matching(span, rules):
# type: (Span, List[SamplingRule]) -> Optional[SamplingRule]
if not rules:
return None

for rule in rules:
if rule.matches(span):
return rule
return None
2 changes: 1 addition & 1 deletion ddtrace/propagation/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ddtrace._trace.span import _MetaDictType
from ddtrace.appsec._constants import APPSEC
from ddtrace.internal import core
from ddtrace.internal.constants import SamplingMechanism
from ddtrace.settings._config import config
from ddtrace.settings.asm import config as asm_config

Expand Down Expand Up @@ -47,7 +48,6 @@
from ..internal.constants import W3C_TRACESTATE_KEY
from ..internal.logger import get_logger
from ..internal.sampling import SAMPLING_DECISION_TRACE_TAG_KEY
from ..internal.sampling import SamplingMechanism
from ..internal.sampling import validate_sampling_decision
from ..internal.utils.http import w3c_tracestate_add_p
from ._utils import get_wsgi_header
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def test_rate_limiter_on_spans(tracer):
Ensure that the rate limiter is applied to spans
"""
from ddtrace._trace.sampler import DatadogSampler
from ddtrace.internal.sampling import SamplingRule
from ddtrace._trace.sampling_rule import SamplingRule
from ddtrace.trace import tracer

# Rate limit is only applied if a sample rate or trace sample rule is set
Expand Down
Loading
Loading