Skip to content

chore: pass SendDataBuilderInfo instead of SendData until flush time #745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ rustls-native-certs = { version = "0.8.1", optional = true }
# be found in the clippy.toml file adjacent to this Cargo.toml.
datadog-protos = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "c89b58e5784b985819baf11f13f7d35876741222"}
ddsketch-agent = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/" }
ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" , features = ["mini_agent"] }
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] }
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }
datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" }
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }
Expand Down
9 changes: 5 additions & 4 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ use bottlecap::{
proxy_flusher::Flusher as ProxyFlusher,
stats_aggregator::StatsAggregator,
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent, trace_aggregator,
stats_processor, trace_agent,
trace_aggregator::{self, SendDataBuilderInfo},
trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher},
trace_processor,
},
Expand Down Expand Up @@ -802,7 +803,7 @@ async fn handle_event_bus_event(
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
tags_provider: Arc<TagProvider>,
trace_processor: Arc<trace_processor::ServerlessTraceProcessor>,
trace_agent_channel: Sender<datadog_trace_utils::send_data::SendData>,
trace_agent_channel: Sender<SendDataBuilderInfo>,
) -> Option<TelemetryEvent> {
match event {
Event::Metric(event) => {
Expand Down Expand Up @@ -1024,7 +1025,7 @@ fn start_trace_agent(
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
trace_aggregator: Arc<TokioMutex<trace_aggregator::TraceAggregator>>,
) -> (
Sender<datadog_trace_utils::send_data::SendData>,
Sender<SendDataBuilderInfo>,
Arc<trace_flusher::ServerlessTraceFlusher>,
Arc<trace_processor::ServerlessTraceProcessor>,
Arc<stats_flusher::ServerlessStatsFlusher>,
Expand Down Expand Up @@ -1146,7 +1147,7 @@ fn start_otlp_agent(
config: &Arc<Config>,
tags_provider: Arc<TagProvider>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_tx: Sender<SendData>,
trace_tx: Sender<SendDataBuilderInfo>,
) -> Option<CancellationToken> {
if !should_enable_otlp_agent(config) {
return None;
Expand Down
17 changes: 9 additions & 8 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use chrono::{DateTime, Utc};
use datadog_trace_protobuf::pb::Span;
use datadog_trace_utils::{send_data::SendData, tracer_header_tags};
use datadog_trace_utils::tracer_header_tags;
use dogstatsd::aggregator::Aggregator as MetricsAggregator;
use serde_json::{json, Value};
use tokio::sync::{mpsc::Sender, watch};
Expand Down Expand Up @@ -35,6 +35,7 @@ use crate::{
},
DatadogCompositePropagator, Propagator,
},
trace_aggregator::SendDataBuilderInfo,
trace_processor::{self, TraceProcessor},
},
};
Expand Down Expand Up @@ -284,7 +285,7 @@ impl Processor {
status: Status,
tags_provider: Arc<provider::Provider>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_agent_tx: Sender<SendData>,
trace_agent_tx: Sender<SendDataBuilderInfo>,
timestamp: i64,
) {
// Set the runtime duration metric
Expand Down Expand Up @@ -328,7 +329,7 @@ impl Processor {
status: Status,
tags_provider: Arc<provider::Provider>,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_agent_tx: Sender<SendData>,
trace_agent_tx: Sender<SendDataBuilderInfo>,
) {
let context = self.enrich_ctx_at_platform_done(request_id, status);

Expand Down Expand Up @@ -415,7 +416,7 @@ impl Processor {
&mut self,
tags_provider: &Arc<provider::Provider>,
trace_processor: &Arc<dyn TraceProcessor + Send + Sync>,
trace_agent_tx: &Sender<SendData>,
trace_agent_tx: &Sender<SendDataBuilderInfo>,
context: Context,
) {
let mut body_size = std::mem::size_of_val(&context.invocation_span);
Expand Down Expand Up @@ -467,7 +468,7 @@ impl Processor {
&mut self,
tags_provider: &Arc<provider::Provider>,
trace_processor: &Arc<dyn TraceProcessor + Send + Sync>,
trace_agent_tx: &Sender<SendData>,
trace_agent_tx: &Sender<SendDataBuilderInfo>,
) {
if let Some(cold_start_context) = self.context_buffer.get_context_with_cold_start() {
if let Some(cold_start_span) = &mut cold_start_context.cold_start_span {
Expand Down Expand Up @@ -500,7 +501,7 @@ impl Processor {
body_size: usize,
tags_provider: &Arc<provider::Provider>,
trace_processor: &Arc<dyn TraceProcessor + Send + Sync>,
trace_agent_tx: &Sender<SendData>,
trace_agent_tx: &Sender<SendDataBuilderInfo>,
) {
// todo: figure out what to do here
let header_tags = tracer_header_tags::TracerHeaderTags {
Expand All @@ -516,7 +517,7 @@ impl Processor {
dropped_p0_spans: 0,
};

let send_data: SendData = trace_processor.process_traces(
let send_data_builder_info: SendDataBuilderInfo = trace_processor.process_traces(
self.config.clone(),
tags_provider.clone(),
header_tags,
Expand All @@ -525,7 +526,7 @@ impl Processor {
self.inferrer.span_pointers.clone(),
);

if let Err(e) = trace_agent_tx.send(send_data).await {
if let Err(e) = trace_agent_tx.send(send_data_builder_info).await {
debug!("Failed to send context spans to agent: {e}");
}
}
Expand Down
31 changes: 15 additions & 16 deletions bottlecap/src/otlp/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use axum::{
routing::post,
Router,
};
use datadog_trace_utils::send_data::SendData;
use datadog_trace_utils::trace_utils::TracerHeaderTags as DatadogTracerHeaderTags;
use serde_json::json;
use std::net::SocketAddr;
Expand All @@ -19,7 +18,7 @@ use crate::{
http::{extract_request_body, handler_not_found},
otlp::processor::Processor as OtlpProcessor,
tags::provider,
traces::trace_processor::TraceProcessor,
traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor},
};

const OTLP_AGENT_HTTP_PORT: u16 = 4318;
Expand All @@ -29,15 +28,15 @@ type AgentState = (
Arc<provider::Provider>,
OtlpProcessor,
Arc<dyn TraceProcessor + Send + Sync>,
Sender<SendData>,
Sender<SendDataBuilderInfo>,
);

pub struct Agent {
config: Arc<Config>,
tags_provider: Arc<provider::Provider>,
processor: OtlpProcessor,
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
trace_tx: Sender<SendData>,
trace_tx: Sender<SendDataBuilderInfo>,
port: u16,
cancel_token: CancellationToken,
}
Expand All @@ -47,7 +46,7 @@ impl Agent {
config: Arc<Config>,
tags_provider: Arc<provider::Provider>,
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
trace_tx: Sender<SendData>,
trace_tx: Sender<SendDataBuilderInfo>,
) -> Self {
let port = Self::parse_port(
&config.otlp_config_receiver_protocols_http_endpoint,
Expand Down Expand Up @@ -157,16 +156,7 @@ impl Agent {

let tracer_header_tags: DatadogTracerHeaderTags = (&parts.headers).into();
let body_size = size_of_val(&traces);
let send_data = trace_processor.process_traces(
config,
tags_provider,
tracer_header_tags,
traces,
body_size,
None,
);

if send_data.is_empty() {
if body_size == 0 {
return (
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "message": "Not sending traces, processor returned empty data" })
Expand All @@ -175,7 +165,16 @@ impl Agent {
.into_response();
}

match trace_tx.send(send_data).await {
let send_data_builder = trace_processor.process_traces(
config,
tags_provider,
tracer_header_tags,
traces,
body_size,
None,
);

match trace_tx.send(send_data_builder).await {
Ok(()) => {
debug!("OTLP | Successfully buffered traces to be flushed.");
(
Expand Down
21 changes: 11 additions & 10 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ use crate::{
tags::provider,
traces::{
proxy_aggregator::{self, ProxyRequest},
stats_aggregator, stats_processor, trace_aggregator, trace_processor,
INVOCATION_SPAN_RESOURCE,
stats_aggregator, stats_processor,
trace_aggregator::{self, SendDataBuilderInfo},
trace_processor, INVOCATION_SPAN_RESOURCE,
},
};
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils::{self, SendData};
use datadog_trace_utils::trace_utils::{self};
use ddcommon::hyper_migration;

const TRACE_AGENT_PORT: usize = 8126;
Expand Down Expand Up @@ -69,7 +70,7 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load";
pub struct TraceState {
pub config: Arc<config::Config>,
pub trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
pub trace_tx: Sender<SendData>,
pub trace_tx: Sender<SendDataBuilderInfo>,
pub invocation_processor: Arc<Mutex<InvocationProcessor>>,
pub tags_provider: Arc<provider::Provider>,
}
Expand All @@ -94,7 +95,7 @@ pub struct TraceAgent {
pub proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
pub tags_provider: Arc<provider::Provider>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
tx: Sender<SendData>,
tx: Sender<SendDataBuilderInfo>,
shutdown_token: CancellationToken,
}

Expand All @@ -120,16 +121,16 @@ impl TraceAgent {
// setup a channel to send processed traces to our flusher. tx is passed through each
// endpoint_handler to the trace processor, which uses it to send de-serialized
// processed trace payloads to our trace flusher.
let (trace_tx, mut trace_rx): (Sender<SendData>, Receiver<SendData>) =
let (trace_tx, mut trace_rx): (Sender<SendDataBuilderInfo>, Receiver<SendDataBuilderInfo>) =
mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE);

// start our trace flusher. receives trace payloads and handles buffering + deciding when to
// flush to backend.

tokio::spawn(async move {
while let Some(tracer_payload) = trace_rx.recv().await {
while let Some(tracer_payload_info) = trace_rx.recv().await {
let mut aggregator = trace_aggregator.lock().await;
aggregator.add(tracer_payload);
aggregator.add(tracer_payload_info);
}
});

Expand Down Expand Up @@ -392,7 +393,7 @@ impl TraceAgent {
config: Arc<config::Config>,
request: Request,
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
trace_tx: Sender<SendData>,
trace_tx: Sender<SendDataBuilderInfo>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
tags_provider: Arc<provider::Provider>,
version: ApiVersion,
Expand Down Expand Up @@ -537,7 +538,7 @@ impl TraceAgent {
}

#[must_use]
pub fn get_sender_copy(&self) -> Sender<SendData> {
pub fn get_sender_copy(&self) -> Sender<SendDataBuilderInfo> {
self.tx.clone()
}

Expand Down
Loading
Loading