Skip to content

feat: Handle API key resolution failure #732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev =
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] }
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b29bba8b4178fc2089943fe28e853d529826888b", default-features = false }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "985120329d0ba96c1ec8d719cc38e1f7ce11a092", default-features = false }
datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" }
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }
axum = { version = "0.8.4", default-features = false, features = ["default"] }
Expand Down
13 changes: 4 additions & 9 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,7 @@ fn create_api_key_factory(
let aws_config = Arc::clone(&aws_config);
let aws_credentials = Arc::clone(&aws_credentials);

Box::pin(async move {
resolve_secrets(config, aws_config, aws_credentials)
.await
.expect("Failed to resolve API key")
})
Box::pin(async move { resolve_secrets(config, aws_config, aws_credentials).await })
})))
}

Expand Down Expand Up @@ -517,7 +513,7 @@ async fn extension_loop_active(
trace_agent_shutdown_token,
) = start_trace_agent(
config,
Arc::clone(&api_key_factory),
&api_key_factory,
&tags_provider,
Arc::clone(&invocation_processor),
Arc::clone(&trace_aggregator),
Expand Down Expand Up @@ -1038,7 +1034,7 @@ fn start_metrics_flushers(
#[allow(clippy::type_complexity)]
fn start_trace_agent(
config: &Arc<Config>,
api_key_factory: Arc<ApiKeyFactory>,
api_key_factory: &Arc<ApiKeyFactory>,
tags_provider: &Arc<TagProvider>,
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
trace_aggregator: Arc<TokioMutex<trace_aggregator::TraceAggregator>>,
Expand All @@ -1064,6 +1060,7 @@ fn start_trace_agent(
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
aggregator: trace_aggregator.clone(),
config: Arc::clone(config),
api_key_factory: Arc::clone(api_key_factory),
});

let obfuscation_config = obfuscation_config::ObfuscationConfig {
Expand All @@ -1077,7 +1074,6 @@ fn start_trace_agent(

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(obfuscation_config),
api_key_factory: api_key_factory.clone(),
});

// Proxy
Expand All @@ -1098,7 +1094,6 @@ fn start_trace_agent(
proxy_aggregator,
invocation_processor,
Arc::clone(tags_provider),
api_key_factory,
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();
Expand Down
6 changes: 4 additions & 2 deletions bottlecap/src/logs/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl Flusher {
config: Arc<config::Config>,
) -> Self {
let client = get_client(&config);

Flusher {
client,
endpoint,
Expand All @@ -54,7 +53,10 @@ impl Flusher {
}

pub async fn flush(&self, batches: Option<Arc<Vec<Vec<u8>>>>) -> Vec<reqwest::RequestBuilder> {
let api_key = self.api_key_factory.get_api_key().await;
let Some(api_key) = self.api_key_factory.get_api_key().await else {
error!("Skipping flushing logs: Failed to resolve API key");
return vec![];
};

let mut set = JoinSet::new();

Expand Down
1 change: 0 additions & 1 deletion bottlecap/src/proxy/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ mod tests {
let metrics_aggregator = Arc::new(Mutex::new(
MetricsAggregator::new(EMPTY_TAGS, 1024).unwrap(),
));

let aws_config = Arc::new(AwsConfig {
region: "us-east-1".to_string(),
function_name: "arn:some-function".to_string(),
Expand Down
12 changes: 7 additions & 5 deletions bottlecap/src/traces/proxy_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use dogstatsd::api_key::ApiKeyFactory;
use reqwest::header::HeaderMap;
use std::{error::Error, sync::Arc};
use thiserror::Error as ThisError;
use tokio::{
sync::{Mutex, OnceCell},
task::JoinSet,
};
use tokio::sync::OnceCell;
use tokio::{sync::Mutex, task::JoinSet};

use tracing::{debug, error};

use crate::{
Expand Down Expand Up @@ -83,7 +82,10 @@ impl Flusher {
&self,
retry_requests: Option<Vec<reqwest::RequestBuilder>>,
) -> Option<Vec<reqwest::RequestBuilder>> {
let api_key = self.api_key_factory.get_api_key().await;
let Some(api_key) = self.api_key_factory.get_api_key().await else {
error!("Skipping flush in proxy flusher: Failed to resolve API key");
return None;
};

let mut join_set = JoinSet::new();
let mut requests = Vec::<reqwest::RequestBuilder>::new();
Expand Down
17 changes: 9 additions & 8 deletions bottlecap/src/traces/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,21 @@ impl StatsFlusher for ServerlessStatsFlusher {
return;
}

let Some(api_key) = self.api_key_factory.get_api_key().await else {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abort at the beginning of StatsFlusher.send()

error!("Skipping flushing stats: Failed to resolve API key");
return;
};

let api_key_clone = api_key.to_string();
let endpoint = self
.endpoint
.get_or_init({
move || async move {
let api_key = self.api_key_factory.get_api_key().await.to_string();
let stats_url = trace_stats_url(&self.config.site);
Endpoint {
url: hyper::Uri::from_str(&stats_url)
.expect("can't make URI from stats url, exiting"),
api_key: Some(api_key.clone().into()),
api_key: Some(api_key_clone.into()),
timeout_ms: self.config.flush_timeout * 1_000,
test_token: None,
}
Expand All @@ -95,12 +100,8 @@ impl StatsFlusher for ServerlessStatsFlusher {

let start = std::time::Instant::now();

let resp = stats_utils::send_stats_payload(
serialized_stats_payload,
endpoint,
self.api_key_factory.get_api_key().await,
)
.await;
let resp =
stats_utils::send_stats_payload(serialized_stats_payload, endpoint, api_key).await;
let elapsed = start.elapsed();
debug!(
"Stats request to {} took {}ms",
Expand Down
8 changes: 2 additions & 6 deletions bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils::{self};
use ddcommon::hyper_migration;
use dogstatsd::api_key::ApiKeyFactory;

const TRACE_AGENT_PORT: usize = 8126;

Expand Down Expand Up @@ -86,7 +85,6 @@ pub struct StatsState {
pub struct ProxyState {
pub config: Arc<config::Config>,
pub proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
pub api_key_factory: Arc<ApiKeyFactory>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to trace_flusher.

}

pub struct TraceAgent {
Expand All @@ -99,7 +97,6 @@ pub struct TraceAgent {
invocation_processor: Arc<Mutex<InvocationProcessor>>,
shutdown_token: CancellationToken,
tx: Sender<SendDataBuilderInfo>,
api_key_factory: Arc<ApiKeyFactory>,
}

#[derive(Clone, Copy)]
Expand All @@ -120,7 +117,6 @@ impl TraceAgent {
proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
invocation_processor: Arc<Mutex<InvocationProcessor>>,
tags_provider: Arc<provider::Provider>,
api_key_factory: Arc<ApiKeyFactory>,
) -> TraceAgent {
// setup a channel to send processed traces to our flusher. tx is passed through each
// endpoint_handler to the trace processor, which uses it to send de-serialized
Expand All @@ -147,7 +143,6 @@ impl TraceAgent {
invocation_processor,
tags_provider,
tx: trace_tx,
api_key_factory,
shutdown_token: CancellationToken::new(),
}
}
Expand Down Expand Up @@ -207,7 +202,6 @@ impl TraceAgent {
let proxy_state = ProxyState {
config: Arc::clone(&self.config),
proxy_aggregator: Arc::clone(&self.proxy_aggregator),
api_key_factory: Arc::clone(&self.api_key_factory),
};

let trace_router = Router::new()
Expand Down Expand Up @@ -395,6 +389,8 @@ impl TraceAgent {
(StatusCode::OK, response_json.to_string()).into_response()
}

#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_lines)]
async fn handle_traces(
config: Arc<config::Config>,
request: Request,
Expand Down
27 changes: 24 additions & 3 deletions bottlecap/src/traces/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ use datadog_trace_utils::{
send_data::SendDataBuilder,
trace_utils::{self, SendData},
};
use dogstatsd::api_key::ApiKeyFactory;

use crate::config::Config;
use crate::traces::trace_aggregator::TraceAggregator;

#[async_trait]
pub trait TraceFlusher {
fn new(aggregator: Arc<Mutex<TraceAggregator>>, config: Arc<Config>) -> Self
fn new(
aggregator: Arc<Mutex<TraceAggregator>>,
config: Arc<Config>,
api_key_factory: Arc<ApiKeyFactory>,
) -> Self
where
Self: Sized;
/// Given a `Vec<SendData>`, a tracer payload, send it to the Datadog intake endpoint.
Expand All @@ -34,15 +39,29 @@ pub trait TraceFlusher {
pub struct ServerlessTraceFlusher {
pub aggregator: Arc<Mutex<TraceAggregator>>,
pub config: Arc<Config>,
pub api_key_factory: Arc<ApiKeyFactory>,
}

#[async_trait]
impl TraceFlusher for ServerlessTraceFlusher {
fn new(aggregator: Arc<Mutex<TraceAggregator>>, config: Arc<Config>) -> Self {
ServerlessTraceFlusher { aggregator, config }
fn new(
aggregator: Arc<Mutex<TraceAggregator>>,
config: Arc<Config>,
api_key_factory: Arc<ApiKeyFactory>,
) -> Self {
ServerlessTraceFlusher {
aggregator,
config,
api_key_factory,
}
}

async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>> {
let Some(api_key) = self.api_key_factory.get_api_key().await else {
error!("Skipping flushing traces: Failed to resolve API key");
return None;
};

let mut failed_batch: Option<Vec<SendData>> = None;

if let Some(traces) = failed_traces {
Expand All @@ -64,6 +83,8 @@ impl TraceFlusher for ServerlessTraceFlusher {
while !trace_builders.is_empty() {
let traces: Vec<_> = trace_builders
.into_iter()
// Lazily set the API key
.map(|builder| builder.with_api_key(api_key))
.map(SendDataBuilder::build)
.collect();
if let Some(failed) = self.send(traces).await {
Expand Down
8 changes: 2 additions & 6 deletions bottlecap/src/traces/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use datadog_trace_utils::trace_utils::{self};
use datadog_trace_utils::tracer_header_tags;
use datadog_trace_utils::tracer_payload::{TraceChunkProcessor, TracerPayloadCollection};
use ddcommon::Endpoint;
use dogstatsd::api_key::ApiKeyFactory;
use std::str::FromStr;
use std::sync::Arc;
use tracing::error;
Expand All @@ -31,7 +30,6 @@ use super::trace_aggregator::SendDataBuilderInfo;
#[allow(clippy::module_name_repetitions)]
pub struct ServerlessTraceProcessor {
pub obfuscation_config: Arc<obfuscation_config::ObfuscationConfig>,
pub api_key_factory: Arc<ApiKeyFactory>,
}

struct ChunkProcessor {
Expand Down Expand Up @@ -164,11 +162,11 @@ impl TraceProcessor for ServerlessTraceProcessor {
tracer_payload.tags.extend(tags.clone());
}
}
let api_key = self.api_key_factory.get_api_key().await.to_string();
let endpoint = Endpoint {
url: hyper::Uri::from_str(&config.apm_dd_url)
.expect("can't parse trace intake URL, exiting"),
api_key: Some(api_key.into()),
// Will be set at flush time
api_key: None,
timeout_ms: config.flush_timeout * 1_000,
test_token: None,
};
Expand All @@ -194,7 +192,6 @@ mod tests {
};

use datadog_trace_obfuscation::obfuscation_config::ObfuscationConfig;
use dogstatsd::api_key::ApiKeyFactory;

use crate::{config::Config, tags::provider::Provider, LAMBDA_RUNTIME_SLUG};

Expand Down Expand Up @@ -299,7 +296,6 @@ mod tests {
};

let trace_processor = ServerlessTraceProcessor {
api_key_factory: Arc::new(ApiKeyFactory::new("test-api-key")),
obfuscation_config: Arc::new(ObfuscationConfig::new().unwrap()),
};
let config = create_test_config();
Expand Down
Loading