Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: fix stale tags behavior in origin enrichment #435

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ http-serde-ext = { version = "1", default-features = false }
tikv-jemalloc-ctl = "0.6"
tikv-jemallocator = { version = "0.6", default-features = false }
axum-extra = { version = "0.9", default-features = false }
papaya = { version = "0.1.7", default-features = false }
papaya = { version = "0.1.8", default-features = false }

[profile.release]
lto = "thin"
Expand Down
2 changes: 1 addition & 1 deletion bin/agent-data-plane/src/components/remapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ mod tests {
#[test]
fn test_remap_object_pool_metrics() {
let mut remapper = AgentTelemetryRemapper {
context_resolver: ContextResolverBuilder::for_tests(),
context_resolver: ContextResolverBuilder::for_tests().build(),
rules: get_datadog_agent_remappings(),
};

Expand Down
2 changes: 1 addition & 1 deletion bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ fn create_topology(
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
let dsd_config = DogStatsDConfiguration::from_configuration(configuration)
.error_context("Failed to configure DogStatsD source.")?
.with_workload_provider(env_provider.workload().clone());
.with_origin_tags_resolver(env_provider.workload().clone());
let dsd_agg_config = AggregateConfiguration::from_configuration(configuration)
.error_context("Failed to configure aggregate transform.")?;
let dsd_prefix_filter_configuration = DogstatsDPrefixFilterConfiguration::from_configuration(configuration)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::{io, num::NonZeroU64};
use datadog_protos::metrics::{self as proto, Resource};
use ddsketch_agent::DDSketch;
use http::{uri::PathAndQuery, HeaderValue, Method, Request, Uri};
use protobuf::CodedOutputStream;
use protobuf::{Chars, CodedOutputStream};
use saluki_context::tags::Tagged as _;
use saluki_core::pooling::ObjectPool;
use saluki_event::metric::*;
use saluki_io::{
Expand Down Expand Up @@ -397,29 +398,37 @@ fn encode_series_metric(metric: &Metric) -> proto::MetricSeries {
let mut series = proto::MetricSeries::new();
series.set_metric(metric.context().name().clone().into());

// Set our tags.
//
// This involves extracting some specific tags first that have to be set on dedicated fields (host, resources, etc)
// and then setting the rest as generic tags.
let mut tags = metric.context().tags().clone();

let mut host_resource = Resource::new();
host_resource.set_type("host".to_string().into());
host_resource.set_name(metric.metadata().hostname().map(|h| h.into()).unwrap_or_default());
series.mut_resources().push(host_resource);

if let Some(ir_tags) = tags.remove_tags("dd.internal.resource") {
for ir_tag in ir_tags {
if let Some((resource_type, resource_name)) = ir_tag.value().and_then(|s| s.split_once(':')) {
// Collect and handle all of our tags.
//
// We extract some specific tags and use them to add resource entries, such that they aren't sent as actual tags.
let mut tags = Vec::new();

let metric_tags = metric.context().tags();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, I forgot to update this area to use the Tagged impl I added to Context.

for metric_tag in metric_tags {
// If this is an "internal resource" tag, we materialize it as a bonafide resource entry instead.
if metric_tag.name() == "dd.internal.resource" {
if let Some((resource_type, resource_name)) = metric_tag.value().and_then(|s| s.split_once(':')) {
let mut resource = Resource::new();
resource.set_type(resource_type.into());
resource.set_name(resource_name.into());
series.mut_resources().push(resource);
}
} else {
// Just a regular metric tag.
tags.push(metric_tag.as_str().into());
}
}

series.set_tags(tags.into_iter().map(|tag| tag.into_inner().into()).collect());
metric.context().origin_tags().visit_tags(|tag| {
tags.push(tag.as_str().into());
});

series.set_tags(tags);

// Set the origin metadata, if it exists.
if let Some(origin) = metric.metadata().origin() {
Expand Down Expand Up @@ -483,15 +492,20 @@ fn encode_sketch_metric(metric: &Metric) -> proto::Sketch {
let mut sketch = proto::Sketch::new();
sketch.set_metric(metric.context().name().into());
sketch.set_host(metric.metadata().hostname().map(|h| h.into()).unwrap_or_default());
sketch.set_tags(
metric
.context()
.tags()
.into_iter()
.cloned()
.map(|tag| tag.into_inner().into())
.collect(),
);

// Collect and handle all of our tags.
let mut tags = metric
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here re: not using the Tagged impl on Context.

.context()
.tags()
.into_iter()
.map(|tag| tag.as_str().into())
.collect::<Vec<Chars>>();

metric.context().origin_tags().visit_tags(|tag| {
tags.push(tag.as_str().into());
});

sketch.set_tags(tags);

// Set the origin metadata, if it exists.
if let Some(MetricOrigin::OriginMetadata {
Expand Down
27 changes: 19 additions & 8 deletions lib/saluki-components/src/destinations/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use hyper::{body::Incoming, service::service_fn};
use indexmap::IndexMap;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_context::{tags::TagSet, Context};
use saluki_context::{tags::Tagged as _, Context};
use saluki_core::components::{destinations::*, ComponentContext};
use saluki_error::GenericError;
use saluki_event::{
Expand Down Expand Up @@ -269,7 +269,7 @@ fn write_metrics(
tags_buffer.clear();

// Format/encode the tags.
if !format_tags(tags_buffer, context.tags()) {
if !format_tags(tags_buffer, context) {
return false;
}

Expand Down Expand Up @@ -320,10 +320,16 @@ fn write_metrics(
true
}

fn format_tags(tags_buffer: &mut String, tags: &TagSet) -> bool {
fn format_tags(tags_buffer: &mut String, context: &Context) -> bool {
let mut has_tags = false;
let mut exceeded = false;

context.visit_tags(|tag| {
// If we exceeded the tags buffer size limit, we can't write any more tags.
if exceeded {
return;
}

for tag in tags {
// If we're not the first tag to be written, add a comma to separate the tags.
if has_tags {
tags_buffer.push(',');
Expand All @@ -334,7 +340,7 @@ fn format_tags(tags_buffer: &mut String, tags: &TagSet) -> bool {
Some(value) => value,
None => {
debug!("Skipping bare tag.");
continue;
return;
}
};

Expand All @@ -343,11 +349,16 @@ fn format_tags(tags_buffer: &mut String, tags: &TagSet) -> bool {
// Can't exceed the tags buffer size limit: we calculate the addition as tag name/value length plus three bytes
// to account for having to format it as `name="value",`.
if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES {
debug!("Tags for metric would exceed tags buffer size limit.");
return false;
exceeded = true;
} else {
write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
}
});

write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
// We would have exceeded the tag buffer limit if we rendered all of the tags.
if exceeded {
debug!("Tags for metric would exceed tags buffer size limit.");
return false;
}

true
Expand Down
68 changes: 46 additions & 22 deletions lib/saluki-components/src/sources/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use bytesize::ByteSize;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use metrics::{Counter, Gauge, Histogram};
use saluki_config::GenericConfiguration;
use saluki_context::{ContextResolver, ContextResolverBuilder};
use saluki_context::{
origin::{OriginInfo, OriginTagsResolver},
ContextResolver, ContextResolverBuilder,
};
use saluki_core::{
components::{sources::*, ComponentContext},
observability::ComponentMetricsExt as _,
Expand All @@ -19,7 +22,6 @@ use saluki_core::{
OutputDefinition,
},
};
use saluki_env::WorkloadProvider;
use saluki_error::{generic_error, GenericError};
use saluki_event::metric::{MetricMetadata, MetricOrigin};
use saluki_event::{metric::Metric, DataType, Event};
Expand Down Expand Up @@ -49,9 +51,6 @@ use tracing::{debug, error, info, trace, warn};
mod framer;
use self::framer::{get_framer, DsdFramer};

mod origin;
use self::origin::{origin_info_from_metric_packet, OriginEnrichmentConfiguration};

#[derive(Debug, Snafu)]
#[snafu(context(suffix(false)))]
enum Error {
Expand Down Expand Up @@ -97,12 +96,10 @@ const fn default_dogstatsd_permissive_decoding() -> bool {
///
/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format.
#[derive(Deserialize)]
pub struct DogStatsDConfiguration {
/// Origin enrichment configuration.
///
/// See [`OriginEnrichmentConfiguration`] for more details.
#[serde(default)]
origin_enrichment: OriginEnrichmentConfiguration,
pub struct DogStatsDConfiguration<R = ()> {
/// Origin tags resolver to use for resolving contexts.
#[serde(skip, default)]
origin_tags_resolver: R,

/// The size of the buffer used to receive messages into, in bytes.
///
Expand Down Expand Up @@ -212,17 +209,28 @@ impl DogStatsDConfiguration {
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
}
}

/// Sets the workload provider for this configuration.
impl<R> DogStatsDConfiguration<R> {
/// Sets the origin tags resolver for this configuration.
///
/// A workload provider must be set in order for origin enrichment to function.
pub fn with_workload_provider<W>(self, workload_provider: W) -> DogStatsDConfiguration
/// An origin tags resolver must be set in order for origin enrichment to function.
pub fn with_origin_tags_resolver<R2>(self, origin_tags_resolver: R2) -> DogStatsDConfiguration<R2>
where
W: WorkloadProvider + Send + Sync + Clone + 'static,
R2: OriginTagsResolver + Clone + 'static,
{
Self {
origin_enrichment: self.origin_enrichment.with_workload_provider(workload_provider),
..self
DogStatsDConfiguration {
origin_tags_resolver,
buffer_size: self.buffer_size,
buffer_count: self.buffer_count,
port: self.port,
socket_path: self.socket_path,
socket_stream_path: self.socket_stream_path,
non_local_traffic: self.non_local_traffic,
allow_context_heap_allocations: self.allow_context_heap_allocations,
no_aggregation_pipeline_support: self.no_aggregation_pipeline_support,
context_string_interner_bytes: self.context_string_interner_bytes,
permissive_decoding: self.permissive_decoding,
}
}

Expand Down Expand Up @@ -267,7 +275,10 @@ impl DogStatsDConfiguration {
}

#[async_trait]
impl SourceBuilder for DogStatsDConfiguration {
impl<R> SourceBuilder for DogStatsDConfiguration<R>
where
R: OriginTagsResolver + Clone + 'static,
{
async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
let listeners = self.build_listeners().await?;
if listeners.is_empty() {
Expand All @@ -282,7 +293,7 @@ impl SourceBuilder for DogStatsDConfiguration {
.with_idle_context_expiration(Duration::from_secs(30))
.with_expiration_interval(Duration::from_secs(1))
.with_heap_allocations(self.allow_context_heap_allocations)
.with_origin_enricher(self.origin_enrichment.build())
.with_origin_tags_resolver(self.origin_tags_resolver.clone())
.build();

let codec_config = DogstatsdCodecConfiguration::default()
Expand Down Expand Up @@ -314,7 +325,10 @@ impl SourceBuilder for DogStatsDConfiguration {
}
}

impl MemoryBounds for DogStatsDConfiguration {
impl<R> MemoryBounds for DogStatsDConfiguration<R>
where
R: OriginTagsResolver,
{
fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
builder
.minimum()
Expand Down Expand Up @@ -909,6 +923,16 @@ const fn get_adjusted_buffer_size(buffer_size: usize) -> usize {
buffer_size + 4
}

/// Builds an `OriginInfo` object from the given metric packet.
fn origin_info_from_metric_packet<'packet>(packet: &MetricPacket<'packet>) -> OriginInfo<'packet> {
let mut origin_info = OriginInfo::default();
origin_info.set_pod_uid(packet.pod_uid);
origin_info.set_container_id(packet.container_id);
origin_info.set_external_data(packet.external_data);
origin_info.set_cardinality(packet.cardinality);
origin_info
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;
Expand All @@ -931,7 +955,7 @@ mod tests {
// We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.

let codec = DogstatsdCodec::from_configuration(DogstatsdCodecConfiguration::default());
let mut context_resolver = ContextResolverBuilder::for_tests().with_heap_allocations(false);
let mut context_resolver = ContextResolverBuilder::for_tests().with_heap_allocations(false).build();
let peer_addr = ConnectionAddress::from("1.1.1.1:1234".parse::<SocketAddr>().unwrap());

let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3";
Expand Down
Loading
Loading