Skip to content

Commit 666bbe5

Browse files
committed
feat: add parent and trace id from proxy to all spans
1 parent 48561e4 commit 666bbe5

File tree

5 files changed

+234
-133
lines changed

5 files changed

+234
-133
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use bottlecap::{
2323
agent::LogsAgent,
2424
flusher::{build_fqdn_logs, Flusher as LogsFlusher},
2525
},
26+
lwa::proxy::start_lwa_proxy,
2627
metrics::enhanced::lambda::Lambda as enhanced_metrics,
2728
secrets::decrypt,
2829
tags::{lambda, provider::Provider as TagProvider},
@@ -70,7 +71,6 @@ use tokio::sync::Mutex as TokioMutex;
7071
use tokio_util::sync::CancellationToken;
7172
use tracing::{debug, error};
7273
use tracing_subscriber::EnvFilter;
73-
use bottlecap::lwa::proxy::start_lwa_proxy;
7474

7575
#[derive(Clone, Deserialize)]
7676
#[serde(rename_all = "camelCase")]
@@ -301,16 +301,16 @@ async fn extension_loop_active(
301301
Arc::clone(config),
302302
aws_config,
303303
)));
304-
let _ = start_lwa_proxy(Arc::clone(&invocation_processor));
305304

306-
307-
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
305+
let trace_processor = Arc::new(TokioMutex::new(trace_processor::ServerlessTraceProcessor {
308306
obfuscation_config: Arc::new(
309307
obfuscation_config::ObfuscationConfig::new()
310308
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?,
311309
),
312310
resolved_api_key: resolved_api_key.clone(),
313-
});
311+
override_trace_id: None,
312+
substitute_parent_span_id: None,
313+
}));
314314

315315
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {
316316
buffer: Arc::new(TokioMutex::new(Vec::new())),
@@ -333,6 +333,12 @@ async fn extension_loop_active(
333333
)
334334
.await,
335335
);
336+
337+
let _ = start_lwa_proxy(
338+
Arc::clone(&invocation_processor),
339+
Arc::clone(&trace_processor),
340+
);
341+
336342
let trace_agent_tx = trace_agent.get_sender_copy();
337343

338344
tokio::spawn(async move {

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,18 @@ use datadog_trace_protobuf::pb::Span;
99
use datadog_trace_utils::{send_data::SendData, tracer_header_tags};
1010
use serde_json::{json, Value};
1111
use tokio::sync::mpsc::Sender;
12+
use tokio::sync::Mutex;
1213
use tracing::debug;
1314

15+
use crate::traces::trace_processor::TraceProcessor;
1416
use crate::{
15-
config::{self, AwsConfig},
17+
config,
18+
config::AwsConfig,
1619
lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer},
1720
tags::provider,
1821
traces::{
1922
context::SpanContext,
2023
propagation::{DatadogCompositePropagator, Propagator},
21-
trace_processor,
2224
},
2325
};
2426

@@ -97,7 +99,7 @@ impl Processor {
9799
duration_ms: f64,
98100
config: Arc<config::Config>,
99101
tags_provider: Arc<provider::Provider>,
100-
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
102+
trace_processor: Arc<Mutex<impl TraceProcessor>>,
101103
trace_agent_tx: Sender<SendData>,
102104
) {
103105
self.context_buffer
@@ -143,12 +145,13 @@ impl Processor {
143145
client_computed_stats: false,
144146
};
145147

146-
let send_data = trace_processor.process_traces(
148+
let send_data = trace_processor.lock().await.process_traces(
147149
config.clone(),
148150
tags_provider.clone(),
149151
header_tags,
150152
vec![traces],
151153
body_size,
154+
true,
152155
);
153156

154157
if let Err(e) = trace_agent_tx.send(send_data).await {

0 commit comments

Comments
 (0)