Skip to content

Commit f37bb89

Browse files
authored
chore: pass SendDataBuilderInfo instead of SendData until flush time (#745)
# Background Right now `SendData` is passed around across channels. # This PR Instead of passing `SendData`, pass `SendDataBuilderInfo`, which bundles `SendDataBuilder` and payload size. Just before flush, call `SendDataBuilder.build()` to build `SendData`. # Motivation DataDog/libdatadog#1140 (comment) It is suggested that the function `set_api_key()` shouldn't be added on `SendData`, but should be added on `SendDataBuilder`. Because need to call `set_api_key()` just before flush, we need to make sure the object is `SendDataBuilder` instead of `SendData` until flush time. And because we need payload size in Trace Aggregator, and `SendDataBuilder` doesn't expose this field, we need to pass it explicitly along with `SendDataBuilder`. # Next steps Update #717 #732 so that `get_api_key()` is called just before flush. # Dependency DataDog/libdatadog#1140
1 parent 84328a7 commit f37bb89

File tree

9 files changed

+139
-113
lines changed

9 files changed

+139
-113
lines changed

bottlecap/Cargo.lock

Lines changed: 25 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ rustls-native-certs = { version = "0.8.1", optional = true }
5252
# be found in the clippy.toml file adjacent to this Cargo.toml.
5353
datadog-protos = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "c89b58e5784b985819baf11f13f7d35876741222"}
5454
ddsketch-agent = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/" }
55-
ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
56-
datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
57-
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" , features = ["mini_agent"] }
58-
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
59-
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" }
55+
ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
56+
datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
57+
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] }
58+
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
59+
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
6060
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }
6161
datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" }
6262
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ use bottlecap::{
5050
proxy_flusher::Flusher as ProxyFlusher,
5151
stats_aggregator::StatsAggregator,
5252
stats_flusher::{self, StatsFlusher},
53-
stats_processor, trace_agent, trace_aggregator,
53+
stats_processor, trace_agent,
54+
trace_aggregator::{self, SendDataBuilderInfo},
5455
trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher},
5556
trace_processor,
5657
},
@@ -802,7 +803,7 @@ async fn handle_event_bus_event(
802803
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
803804
tags_provider: Arc<TagProvider>,
804805
trace_processor: Arc<trace_processor::ServerlessTraceProcessor>,
805-
trace_agent_channel: Sender<datadog_trace_utils::send_data::SendData>,
806+
trace_agent_channel: Sender<SendDataBuilderInfo>,
806807
) -> Option<TelemetryEvent> {
807808
match event {
808809
Event::Metric(event) => {
@@ -1024,7 +1025,7 @@ fn start_trace_agent(
10241025
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
10251026
trace_aggregator: Arc<TokioMutex<trace_aggregator::TraceAggregator>>,
10261027
) -> (
1027-
Sender<datadog_trace_utils::send_data::SendData>,
1028+
Sender<SendDataBuilderInfo>,
10281029
Arc<trace_flusher::ServerlessTraceFlusher>,
10291030
Arc<trace_processor::ServerlessTraceProcessor>,
10301031
Arc<stats_flusher::ServerlessStatsFlusher>,
@@ -1146,7 +1147,7 @@ fn start_otlp_agent(
11461147
config: &Arc<Config>,
11471148
tags_provider: Arc<TagProvider>,
11481149
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
1149-
trace_tx: Sender<SendData>,
1150+
trace_tx: Sender<SendDataBuilderInfo>,
11501151
) -> Option<CancellationToken> {
11511152
if !should_enable_otlp_agent(config) {
11521153
return None;

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use chrono::{DateTime, Utc};
88
use datadog_trace_protobuf::pb::Span;
9-
use datadog_trace_utils::{send_data::SendData, tracer_header_tags};
9+
use datadog_trace_utils::tracer_header_tags;
1010
use dogstatsd::aggregator::Aggregator as MetricsAggregator;
1111
use serde_json::{json, Value};
1212
use tokio::sync::{mpsc::Sender, watch};
@@ -35,6 +35,7 @@ use crate::{
3535
},
3636
DatadogCompositePropagator, Propagator,
3737
},
38+
trace_aggregator::SendDataBuilderInfo,
3839
trace_processor::{self, TraceProcessor},
3940
},
4041
};
@@ -284,7 +285,7 @@ impl Processor {
284285
status: Status,
285286
tags_provider: Arc<provider::Provider>,
286287
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
287-
trace_agent_tx: Sender<SendData>,
288+
trace_agent_tx: Sender<SendDataBuilderInfo>,
288289
timestamp: i64,
289290
) {
290291
// Set the runtime duration metric
@@ -328,7 +329,7 @@ impl Processor {
328329
status: Status,
329330
tags_provider: Arc<provider::Provider>,
330331
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
331-
trace_agent_tx: Sender<SendData>,
332+
trace_agent_tx: Sender<SendDataBuilderInfo>,
332333
) {
333334
let context = self.enrich_ctx_at_platform_done(request_id, status);
334335

@@ -415,7 +416,7 @@ impl Processor {
415416
&mut self,
416417
tags_provider: &Arc<provider::Provider>,
417418
trace_processor: &Arc<dyn TraceProcessor + Send + Sync>,
418-
trace_agent_tx: &Sender<SendData>,
419+
trace_agent_tx: &Sender<SendDataBuilderInfo>,
419420
context: Context,
420421
) {
421422
let mut body_size = std::mem::size_of_val(&context.invocation_span);
@@ -467,7 +468,7 @@ impl Processor {
467468
&mut self,
468469
tags_provider: &Arc<provider::Provider>,
469470
trace_processor: &Arc<dyn TraceProcessor + Send + Sync>,
470-
trace_agent_tx: &Sender<SendData>,
471+
trace_agent_tx: &Sender<SendDataBuilderInfo>,
471472
) {
472473
if let Some(cold_start_context) = self.context_buffer.get_context_with_cold_start() {
473474
if let Some(cold_start_span) = &mut cold_start_context.cold_start_span {
@@ -500,7 +501,7 @@ impl Processor {
500501
body_size: usize,
501502
tags_provider: &Arc<provider::Provider>,
502503
trace_processor: &Arc<dyn TraceProcessor + Send + Sync>,
503-
trace_agent_tx: &Sender<SendData>,
504+
trace_agent_tx: &Sender<SendDataBuilderInfo>,
504505
) {
505506
// todo: figure out what to do here
506507
let header_tags = tracer_header_tags::TracerHeaderTags {
@@ -516,7 +517,7 @@ impl Processor {
516517
dropped_p0_spans: 0,
517518
};
518519

519-
let send_data: SendData = trace_processor.process_traces(
520+
let send_data_builder_info: SendDataBuilderInfo = trace_processor.process_traces(
520521
self.config.clone(),
521522
tags_provider.clone(),
522523
header_tags,
@@ -525,7 +526,7 @@ impl Processor {
525526
self.inferrer.span_pointers.clone(),
526527
);
527528

528-
if let Err(e) = trace_agent_tx.send(send_data).await {
529+
if let Err(e) = trace_agent_tx.send(send_data_builder_info).await {
529530
debug!("Failed to send context spans to agent: {e}");
530531
}
531532
}

bottlecap/src/otlp/agent.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use axum::{
55
routing::post,
66
Router,
77
};
8-
use datadog_trace_utils::send_data::SendData;
98
use datadog_trace_utils::trace_utils::TracerHeaderTags as DatadogTracerHeaderTags;
109
use serde_json::json;
1110
use std::net::SocketAddr;
@@ -19,7 +18,7 @@ use crate::{
1918
http::{extract_request_body, handler_not_found},
2019
otlp::processor::Processor as OtlpProcessor,
2120
tags::provider,
22-
traces::trace_processor::TraceProcessor,
21+
traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor},
2322
};
2423

2524
const OTLP_AGENT_HTTP_PORT: u16 = 4318;
@@ -29,15 +28,15 @@ type AgentState = (
2928
Arc<provider::Provider>,
3029
OtlpProcessor,
3130
Arc<dyn TraceProcessor + Send + Sync>,
32-
Sender<SendData>,
31+
Sender<SendDataBuilderInfo>,
3332
);
3433

3534
pub struct Agent {
3635
config: Arc<Config>,
3736
tags_provider: Arc<provider::Provider>,
3837
processor: OtlpProcessor,
3938
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
40-
trace_tx: Sender<SendData>,
39+
trace_tx: Sender<SendDataBuilderInfo>,
4140
port: u16,
4241
cancel_token: CancellationToken,
4342
}
@@ -47,7 +46,7 @@ impl Agent {
4746
config: Arc<Config>,
4847
tags_provider: Arc<provider::Provider>,
4948
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
50-
trace_tx: Sender<SendData>,
49+
trace_tx: Sender<SendDataBuilderInfo>,
5150
) -> Self {
5251
let port = Self::parse_port(
5352
&config.otlp_config_receiver_protocols_http_endpoint,
@@ -157,16 +156,7 @@ impl Agent {
157156

158157
let tracer_header_tags: DatadogTracerHeaderTags = (&parts.headers).into();
159158
let body_size = size_of_val(&traces);
160-
let send_data = trace_processor.process_traces(
161-
config,
162-
tags_provider,
163-
tracer_header_tags,
164-
traces,
165-
body_size,
166-
None,
167-
);
168-
169-
if send_data.is_empty() {
159+
if body_size == 0 {
170160
return (
171161
StatusCode::INTERNAL_SERVER_ERROR,
172162
json!({ "message": "Not sending traces, processor returned empty data" })
@@ -175,7 +165,16 @@ impl Agent {
175165
.into_response();
176166
}
177167

178-
match trace_tx.send(send_data).await {
168+
let send_data_builder = trace_processor.process_traces(
169+
config,
170+
tags_provider,
171+
tracer_header_tags,
172+
traces,
173+
body_size,
174+
None,
175+
);
176+
177+
match trace_tx.send(send_data_builder).await {
179178
Ok(()) => {
180179
debug!("OTLP | Successfully buffered traces to be flushed.");
181180
(

bottlecap/src/traces/trace_agent.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ use crate::{
2929
tags::provider,
3030
traces::{
3131
proxy_aggregator::{self, ProxyRequest},
32-
stats_aggregator, stats_processor, trace_aggregator, trace_processor,
33-
INVOCATION_SPAN_RESOURCE,
32+
stats_aggregator, stats_processor,
33+
trace_aggregator::{self, SendDataBuilderInfo},
34+
trace_processor, INVOCATION_SPAN_RESOURCE,
3435
},
3536
};
3637
use datadog_trace_protobuf::pb;
37-
use datadog_trace_utils::trace_utils::{self, SendData};
38+
use datadog_trace_utils::trace_utils::{self};
3839
use ddcommon::hyper_migration;
3940

4041
const TRACE_AGENT_PORT: usize = 8126;
@@ -69,7 +70,7 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load";
6970
pub struct TraceState {
7071
pub config: Arc<config::Config>,
7172
pub trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
72-
pub trace_tx: Sender<SendData>,
73+
pub trace_tx: Sender<SendDataBuilderInfo>,
7374
pub invocation_processor: Arc<Mutex<InvocationProcessor>>,
7475
pub tags_provider: Arc<provider::Provider>,
7576
}
@@ -94,7 +95,7 @@ pub struct TraceAgent {
9495
pub proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
9596
pub tags_provider: Arc<provider::Provider>,
9697
invocation_processor: Arc<Mutex<InvocationProcessor>>,
97-
tx: Sender<SendData>,
98+
tx: Sender<SendDataBuilderInfo>,
9899
shutdown_token: CancellationToken,
99100
}
100101

@@ -120,16 +121,16 @@ impl TraceAgent {
120121
// setup a channel to send processed traces to our flusher. tx is passed through each
121122
// endpoint_handler to the trace processor, which uses it to send de-serialized
122123
// processed trace payloads to our trace flusher.
123-
let (trace_tx, mut trace_rx): (Sender<SendData>, Receiver<SendData>) =
124+
let (trace_tx, mut trace_rx): (Sender<SendDataBuilderInfo>, Receiver<SendDataBuilderInfo>) =
124125
mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE);
125126

126127
// start our trace flusher. receives trace payloads and handles buffering + deciding when to
127128
// flush to backend.
128129

129130
tokio::spawn(async move {
130-
while let Some(tracer_payload) = trace_rx.recv().await {
131+
while let Some(tracer_payload_info) = trace_rx.recv().await {
131132
let mut aggregator = trace_aggregator.lock().await;
132-
aggregator.add(tracer_payload);
133+
aggregator.add(tracer_payload_info);
133134
}
134135
});
135136

@@ -392,7 +393,7 @@ impl TraceAgent {
392393
config: Arc<config::Config>,
393394
request: Request,
394395
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
395-
trace_tx: Sender<SendData>,
396+
trace_tx: Sender<SendDataBuilderInfo>,
396397
invocation_processor: Arc<Mutex<InvocationProcessor>>,
397398
tags_provider: Arc<provider::Provider>,
398399
version: ApiVersion,
@@ -537,7 +538,7 @@ impl TraceAgent {
537538
}
538539

539540
#[must_use]
540-
pub fn get_sender_copy(&self) -> Sender<SendData> {
541+
pub fn get_sender_copy(&self) -> Sender<SendDataBuilderInfo> {
541542
self.tx.clone()
542543
}
543544

0 commit comments

Comments
 (0)