Skip to content

Commit 8bdd819

Browse files
authored
feat: Handle API key resolution failure (#732)
# Context The previous PR #717 defers API key resolution from extension init stage to flush time. However, that PR doesn't well handle the failure case. - Before that PR, if resolution fails in init stage, the extension will run an idle loop. - After that PR, the extension will crash at flush time, which will kill the runtime as well, which is not desired. # What does this PR do? 1. For traces, defer key resolution from `TraceProcessor.process_traces()` to `TraceFlusher.flush()`. - (This should ideally be in the previous PR, but since that is already approved, let me add this change in this new PR.) 2. If resolution fails at flush time, then make flush a no-op, so the extension can keep running and consume events without crashing. # Dependencies 1. DataDog/serverless-components#25 2. DataDog/libdatadog#1140 # Manual Test ## Steps 1. Create a layer in sandbox 2. Apply the layer to a Lambda function 3. Set the env var `DD_API_KEY_SECRET_ARN` to an invalid value 5. Run the Lambda 6. Then set `DD_API_KEY_SECRET_ARN` to a valid value 7. Run the Lambda ## Result 1. The function was successful <img width="319" alt="image" src="https://github.com/user-attachments/assets/f8a5cb36-f678-4643-ba1c-85f41256ffa1" /> 2. The extension printed some error logs <img width="737" height="33" alt="image" src="https://github.com/user-attachments/assets/22553d24-e1f5-4ee5-9a91-0d18e3e2f297" /> <img width="603" height="186" alt="image" src="https://github.com/user-attachments/assets/e797f991-ecba-45f0-8f49-7b7b59dd9e7b" /> 3. With valid secret ARN, the Lambda runs successfully and reports to Datadog <img width="678" height="150" alt="image" src="https://github.com/user-attachments/assets/073089f8-1e9a-4728-b8d1-1db7aa85d031" /> <img width="533" height="96" alt="image" src="https://github.com/user-attachments/assets/d5f2b81c-5e02-42bc-b3ef-85e611228fc6" /> # Automated Test I didn't add any automated test because from what I see in the codebase, existing tests are usually unit tests for short functions and not for long functions that this PR touches. Please let me know if you think I should add automated tests.
1 parent a8d05a1 commit 8bdd819

File tree

10 files changed

+56
-44
lines changed

10 files changed

+56
-44
lines changed

bottlecap/Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev =
5757
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] }
5858
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
5959
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
60-
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b29bba8b4178fc2089943fe28e853d529826888b", default-features = false }
60+
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "985120329d0ba96c1ec8d719cc38e1f7ce11a092", default-features = false }
6161
datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" }
6262
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }
6363
axum = { version = "0.8.4", default-features = false, features = ["default"] }

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -439,11 +439,7 @@ fn create_api_key_factory(
439439
let aws_config = Arc::clone(&aws_config);
440440
let aws_credentials = Arc::clone(&aws_credentials);
441441

442-
Box::pin(async move {
443-
resolve_secrets(config, aws_config, aws_credentials)
444-
.await
445-
.expect("Failed to resolve API key")
446-
})
442+
Box::pin(async move { resolve_secrets(config, aws_config, aws_credentials).await })
447443
})))
448444
}
449445

@@ -517,7 +513,7 @@ async fn extension_loop_active(
517513
trace_agent_shutdown_token,
518514
) = start_trace_agent(
519515
config,
520-
Arc::clone(&api_key_factory),
516+
&api_key_factory,
521517
&tags_provider,
522518
Arc::clone(&invocation_processor),
523519
Arc::clone(&trace_aggregator),
@@ -1038,7 +1034,7 @@ fn start_metrics_flushers(
10381034
#[allow(clippy::type_complexity)]
10391035
fn start_trace_agent(
10401036
config: &Arc<Config>,
1041-
api_key_factory: Arc<ApiKeyFactory>,
1037+
api_key_factory: &Arc<ApiKeyFactory>,
10421038
tags_provider: &Arc<TagProvider>,
10431039
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
10441040
trace_aggregator: Arc<TokioMutex<trace_aggregator::TraceAggregator>>,
@@ -1064,6 +1060,7 @@ fn start_trace_agent(
10641060
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
10651061
aggregator: trace_aggregator.clone(),
10661062
config: Arc::clone(config),
1063+
api_key_factory: Arc::clone(api_key_factory),
10671064
});
10681065

10691066
let obfuscation_config = obfuscation_config::ObfuscationConfig {
@@ -1077,7 +1074,6 @@ fn start_trace_agent(
10771074

10781075
let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
10791076
obfuscation_config: Arc::new(obfuscation_config),
1080-
api_key_factory: api_key_factory.clone(),
10811077
});
10821078

10831079
// Proxy
@@ -1098,7 +1094,6 @@ fn start_trace_agent(
10981094
proxy_aggregator,
10991095
invocation_processor,
11001096
Arc::clone(tags_provider),
1101-
api_key_factory,
11021097
);
11031098
let trace_agent_channel = trace_agent.get_sender_copy();
11041099
let shutdown_token = trace_agent.shutdown_token();

bottlecap/src/logs/flusher.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ impl Flusher {
4242
config: Arc<config::Config>,
4343
) -> Self {
4444
let client = get_client(&config);
45-
4645
Flusher {
4746
client,
4847
endpoint,
@@ -54,7 +53,10 @@ impl Flusher {
5453
}
5554

5655
pub async fn flush(&self, batches: Option<Arc<Vec<Vec<u8>>>>) -> Vec<reqwest::RequestBuilder> {
57-
let api_key = self.api_key_factory.get_api_key().await;
56+
let Some(api_key) = self.api_key_factory.get_api_key().await else {
57+
error!("Skipping flushing logs: Failed to resolve API key");
58+
return vec![];
59+
};
5860

5961
let mut set = JoinSet::new();
6062

bottlecap/src/proxy/interceptor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,6 @@ mod tests {
399399
let metrics_aggregator = Arc::new(Mutex::new(
400400
MetricsAggregator::new(EMPTY_TAGS, 1024).unwrap(),
401401
));
402-
403402
let aws_config = Arc::new(AwsConfig {
404403
region: "us-east-1".to_string(),
405404
function_name: "arn:some-function".to_string(),

bottlecap/src/traces/proxy_flusher.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ use dogstatsd::api_key::ApiKeyFactory;
22
use reqwest::header::HeaderMap;
33
use std::{error::Error, sync::Arc};
44
use thiserror::Error as ThisError;
5-
use tokio::{
6-
sync::{Mutex, OnceCell},
7-
task::JoinSet,
8-
};
5+
use tokio::sync::OnceCell;
6+
use tokio::{sync::Mutex, task::JoinSet};
7+
98
use tracing::{debug, error};
109

1110
use crate::{
@@ -83,7 +82,10 @@ impl Flusher {
8382
&self,
8483
retry_requests: Option<Vec<reqwest::RequestBuilder>>,
8584
) -> Option<Vec<reqwest::RequestBuilder>> {
86-
let api_key = self.api_key_factory.get_api_key().await;
85+
let Some(api_key) = self.api_key_factory.get_api_key().await else {
86+
error!("Skipping flush in proxy flusher: Failed to resolve API key");
87+
return None;
88+
};
8789

8890
let mut join_set = JoinSet::new();
8991
let mut requests = Vec::<reqwest::RequestBuilder>::new();

bottlecap/src/traces/stats_flusher.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,21 @@ impl StatsFlusher for ServerlessStatsFlusher {
6060
return;
6161
}
6262

63+
let Some(api_key) = self.api_key_factory.get_api_key().await else {
64+
error!("Skipping flushing stats: Failed to resolve API key");
65+
return;
66+
};
67+
68+
let api_key_clone = api_key.to_string();
6369
let endpoint = self
6470
.endpoint
6571
.get_or_init({
6672
move || async move {
67-
let api_key = self.api_key_factory.get_api_key().await.to_string();
6873
let stats_url = trace_stats_url(&self.config.site);
6974
Endpoint {
7075
url: hyper::Uri::from_str(&stats_url)
7176
.expect("can't make URI from stats url, exiting"),
72-
api_key: Some(api_key.clone().into()),
77+
api_key: Some(api_key_clone.into()),
7378
timeout_ms: self.config.flush_timeout * 1_000,
7479
test_token: None,
7580
}
@@ -95,12 +100,8 @@ impl StatsFlusher for ServerlessStatsFlusher {
95100

96101
let start = std::time::Instant::now();
97102

98-
let resp = stats_utils::send_stats_payload(
99-
serialized_stats_payload,
100-
endpoint,
101-
self.api_key_factory.get_api_key().await,
102-
)
103-
.await;
103+
let resp =
104+
stats_utils::send_stats_payload(serialized_stats_payload, endpoint, api_key).await;
104105
let elapsed = start.elapsed();
105106
debug!(
106107
"Stats request to {} took {}ms",

bottlecap/src/traces/trace_agent.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use crate::{
3737
use datadog_trace_protobuf::pb;
3838
use datadog_trace_utils::trace_utils::{self};
3939
use ddcommon::hyper_migration;
40-
use dogstatsd::api_key::ApiKeyFactory;
4140

4241
const TRACE_AGENT_PORT: usize = 8126;
4342

@@ -86,7 +85,6 @@ pub struct StatsState {
8685
pub struct ProxyState {
8786
pub config: Arc<config::Config>,
8887
pub proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
89-
pub api_key_factory: Arc<ApiKeyFactory>,
9088
}
9189

9290
pub struct TraceAgent {
@@ -99,7 +97,6 @@ pub struct TraceAgent {
9997
invocation_processor: Arc<Mutex<InvocationProcessor>>,
10098
shutdown_token: CancellationToken,
10199
tx: Sender<SendDataBuilderInfo>,
102-
api_key_factory: Arc<ApiKeyFactory>,
103100
}
104101

105102
#[derive(Clone, Copy)]
@@ -120,7 +117,6 @@ impl TraceAgent {
120117
proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
121118
invocation_processor: Arc<Mutex<InvocationProcessor>>,
122119
tags_provider: Arc<provider::Provider>,
123-
api_key_factory: Arc<ApiKeyFactory>,
124120
) -> TraceAgent {
125121
// setup a channel to send processed traces to our flusher. tx is passed through each
126122
// endpoint_handler to the trace processor, which uses it to send de-serialized
@@ -147,7 +143,6 @@ impl TraceAgent {
147143
invocation_processor,
148144
tags_provider,
149145
tx: trace_tx,
150-
api_key_factory,
151146
shutdown_token: CancellationToken::new(),
152147
}
153148
}
@@ -207,7 +202,6 @@ impl TraceAgent {
207202
let proxy_state = ProxyState {
208203
config: Arc::clone(&self.config),
209204
proxy_aggregator: Arc::clone(&self.proxy_aggregator),
210-
api_key_factory: Arc::clone(&self.api_key_factory),
211205
};
212206

213207
let trace_router = Router::new()
@@ -395,6 +389,8 @@ impl TraceAgent {
395389
(StatusCode::OK, response_json.to_string()).into_response()
396390
}
397391

392+
#[allow(clippy::too_many_arguments)]
393+
#[allow(clippy::too_many_lines)]
398394
async fn handle_traces(
399395
config: Arc<config::Config>,
400396
request: Request,

bottlecap/src/traces/trace_flusher.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,18 @@ use datadog_trace_utils::{
1010
send_data::SendDataBuilder,
1111
trace_utils::{self, SendData},
1212
};
13+
use dogstatsd::api_key::ApiKeyFactory;
1314

1415
use crate::config::Config;
1516
use crate::traces::trace_aggregator::TraceAggregator;
1617

1718
#[async_trait]
1819
pub trait TraceFlusher {
19-
fn new(aggregator: Arc<Mutex<TraceAggregator>>, config: Arc<Config>) -> Self
20+
fn new(
21+
aggregator: Arc<Mutex<TraceAggregator>>,
22+
config: Arc<Config>,
23+
api_key_factory: Arc<ApiKeyFactory>,
24+
) -> Self
2025
where
2126
Self: Sized;
2227
/// Given a `Vec<SendData>`, a tracer payload, send it to the Datadog intake endpoint.
@@ -34,15 +39,29 @@ pub trait TraceFlusher {
3439
pub struct ServerlessTraceFlusher {
3540
pub aggregator: Arc<Mutex<TraceAggregator>>,
3641
pub config: Arc<Config>,
42+
pub api_key_factory: Arc<ApiKeyFactory>,
3743
}
3844

3945
#[async_trait]
4046
impl TraceFlusher for ServerlessTraceFlusher {
41-
fn new(aggregator: Arc<Mutex<TraceAggregator>>, config: Arc<Config>) -> Self {
42-
ServerlessTraceFlusher { aggregator, config }
47+
fn new(
48+
aggregator: Arc<Mutex<TraceAggregator>>,
49+
config: Arc<Config>,
50+
api_key_factory: Arc<ApiKeyFactory>,
51+
) -> Self {
52+
ServerlessTraceFlusher {
53+
aggregator,
54+
config,
55+
api_key_factory,
56+
}
4357
}
4458

4559
async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>> {
60+
let Some(api_key) = self.api_key_factory.get_api_key().await else {
61+
error!("Skipping flushing traces: Failed to resolve API key");
62+
return None;
63+
};
64+
4665
let mut failed_batch: Option<Vec<SendData>> = None;
4766

4867
if let Some(traces) = failed_traces {
@@ -64,6 +83,8 @@ impl TraceFlusher for ServerlessTraceFlusher {
6483
while !trace_builders.is_empty() {
6584
let traces: Vec<_> = trace_builders
6685
.into_iter()
86+
// Lazily set the API key
87+
.map(|builder| builder.with_api_key(api_key))
6788
.map(SendDataBuilder::build)
6889
.collect();
6990
if let Some(failed) = self.send(traces).await {

bottlecap/src/traces/trace_processor.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use datadog_trace_utils::trace_utils::{self};
2020
use datadog_trace_utils::tracer_header_tags;
2121
use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollection};
2222
use ddcommon::Endpoint;
23-
use dogstatsd::api_key::ApiKeyFactory;
2423
use std::str::FromStr;
2524
use std::sync::Arc;
2625
use tracing::error;
@@ -31,7 +30,6 @@ use super::trace_aggregator::SendDataBuilderInfo;
3130
#[allow(clippy::module_name_repetitions)]
3231
pub struct ServerlessTraceProcessor {
3332
pub obfuscation_config: Arc<obfuscation_config::ObfuscationConfig>,
34-
pub api_key_factory: Arc<ApiKeyFactory>,
3533
}
3634

3735
struct ChunkProcessor {
@@ -164,11 +162,11 @@ impl TraceProcessor for ServerlessTraceProcessor {
164162
tracer_payload.tags.extend(tags.clone());
165163
}
166164
}
167-
let api_key = self.api_key_factory.get_api_key().await.to_string();
168165
let endpoint = Endpoint {
169166
url: hyper::Uri::from_str(&config.apm_dd_url)
170167
.expect("can't parse trace intake URL, exiting"),
171-
api_key: Some(api_key.into()),
168+
// Will be set at flush time
169+
api_key: None,
172170
timeout_ms: config.flush_timeout * 1_000,
173171
test_token: None,
174172
};
@@ -194,7 +192,6 @@ mod tests {
194192
};
195193

196194
use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig;
197-
use dogstatsd::api_key::ApiKeyFactory;
198195

199196
use crate::{config::Config, tags::provider::Provider, LAMBDA_RUNTIME_SLUG};
200197

@@ -299,7 +296,6 @@ mod tests {
299296
};
300297

301298
let trace_processor = ServerlessTraceProcessor {
302-
api_key_factory: Arc::new(ApiKeyFactory::new("test-api-key")),
303299
obfuscation_config: Arc::new(ObfuscationConfig::new().unwrap()),
304300
};
305301
let config = create_test_config();

0 commit comments

Comments
 (0)