Skip to content

feat: Lazily resolve api key #717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev =
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] }
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b29bba8b4178fc2089943fe28e853d529826888b", default-features = false }
datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" }
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false }
axum = { version = "0.8.4", default-features = false, features = ["default"] }
Expand Down
112 changes: 65 additions & 47 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use datadog_trace_utils::send_data::SendData;
use decrypt::resolve_secrets;
use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
api_key::ApiKeyFactory,
constants::CONTEXTS,
datadog::{
DdDdUrl, DdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride,
Expand All @@ -88,7 +89,7 @@ use std::{
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tokio::{sync::mpsc::Sender, sync::Mutex as TokioMutex, task::JoinHandle};
use tokio::{sync::mpsc::Sender, sync::Mutex as TokioMutex, sync::RwLock, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -333,7 +334,7 @@ async fn register(client: &Client) -> Result<RegisterResponse> {
#[tokio::main]
async fn main() -> Result<()> {
let start_time = Instant::now();
let (aws_config, mut aws_credentials, config) = load_configs(start_time);
let (aws_config, aws_credentials, config) = load_configs(start_time);

enable_logging_subsystem(&config);
log_fips_status(&aws_config.region);
Expand All @@ -360,33 +361,27 @@ async fn main() -> Result<()> {
.await
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;

if let Some(resolved_api_key) =
resolve_secrets(Arc::clone(&config), &aws_config, &mut aws_credentials).await
let aws_config = Arc::new(aws_config);
let api_key_factory = create_api_key_factory(&config, &aws_config, aws_credentials);

match extension_loop_active(
Arc::clone(&aws_config),
&config,
&client,
&r,
Arc::clone(&api_key_factory),
start_time,
)
.await
{
match extension_loop_active(
&aws_config,
&config,
&client,
&r,
resolved_api_key,
start_time,
)
.await
{
Ok(()) => {
debug!("Extension loop completed successfully");
Ok(())
}
Err(e) => {
error!(
"Extension loop failed: {e:?}, Calling /next without Datadog instrumentation"
);
extension_loop_idle(&client, &r).await
}
Ok(()) => {
debug!("Extension loop completed successfully");
Ok(())
}
Err(e) => {
error!("Extension loop failed: {e:?}, Calling /next without Datadog instrumentation");
extension_loop_idle(&client, &r).await
}
} else {
error!("Failed to resolve secrets, Datadog extension will be idle");
extension_loop_idle(&client, &r).await
}
}

Expand Down Expand Up @@ -430,6 +425,28 @@ fn enable_logging_subsystem(config: &Arc<Config>) {
debug!("Logging subsystem enabled");
}

fn create_api_key_factory(
config: &Arc<Config>,
aws_config: &Arc<AwsConfig>,
aws_credentials: AwsCredentials,
) -> Arc<ApiKeyFactory> {
let config = Arc::clone(config);
let aws_config = Arc::clone(aws_config);
let aws_credentials = Arc::new(RwLock::new(aws_credentials));

Arc::new(ApiKeyFactory::new_from_resolver(Arc::new(move || {
let config = Arc::clone(&config);
let aws_config = Arc::clone(&aws_config);
let aws_credentials = Arc::clone(&aws_credentials);

Box::pin(async move {
resolve_secrets(config, aws_config, aws_credentials)
.await
.expect("Failed to resolve API key")
})
})))
}

async fn extension_loop_idle(client: &Client, r: &RegisterResponse) -> Result<()> {
loop {
match next_event(client, &r.extension_id).await {
Expand All @@ -446,11 +463,11 @@ async fn extension_loop_idle(client: &Client, r: &RegisterResponse) -> Result<()

#[allow(clippy::too_many_lines)]
async fn extension_loop_active(
aws_config: &AwsConfig,
aws_config: Arc<AwsConfig>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing it to Arc so it can be passed to ApiKeyFactory and shared across threads.

config: &Arc<Config>,
client: &Client,
r: &RegisterResponse,
resolved_api_key: String,
api_key_factory: Arc<ApiKeyFactory>,
start_time: Instant,
) -> Result<()> {
let mut event_bus = EventBus::run();
Expand All @@ -460,11 +477,11 @@ async fn extension_loop_active(
.as_ref()
.unwrap_or(&"none".to_string())
.to_string();
let tags_provider = setup_tag_provider(aws_config, config, &account_id);
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);

let (logs_agent_channel, logs_flusher) = start_logs_agent(
config,
resolved_api_key.clone(),
Arc::clone(&api_key_factory),
&tags_provider,
event_bus.get_sender_copy(),
);
Expand All @@ -478,15 +495,15 @@ async fn extension_loop_active(
));

let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers(
resolved_api_key.clone(),
Arc::clone(&api_key_factory),
&metrics_aggr,
config,
)));
// Lifecycle Invocation Processor
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
aws_config,
Arc::clone(&aws_config),
Arc::clone(&metrics_aggr),
)));

Expand All @@ -500,7 +517,7 @@ async fn extension_loop_active(
trace_agent_shutdown_token,
) = start_trace_agent(
config,
resolved_api_key.clone(),
Arc::clone(&api_key_factory),
&tags_provider,
Arc::clone(&invocation_processor),
Arc::clone(&trace_aggregator),
Expand Down Expand Up @@ -919,7 +936,7 @@ async fn handle_next_invocation(
}

fn setup_tag_provider(
aws_config: &AwsConfig,
aws_config: &Arc<AwsConfig>,
config: &Arc<Config>,
account_id: &str,
) -> Arc<TagProvider> {
Expand All @@ -938,14 +955,14 @@ fn setup_tag_provider(

fn start_logs_agent(
config: &Arc<Config>,
resolved_api_key: String,
api_key_factory: Arc<ApiKeyFactory>,
tags_provider: &Arc<TagProvider>,
event_bus: Sender<Event>,
) -> (Sender<TelemetryEvent>, LogsFlusher) {
let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus);
let logs_agent_channel = logs_agent.get_sender_copy();
let logs_flusher = LogsFlusher::new(
resolved_api_key,
api_key_factory,
Arc::clone(&logs_agent.aggregator),
config.clone(),
);
Expand All @@ -956,7 +973,7 @@ fn start_logs_agent(
}

fn start_metrics_flushers(
resolved_api_key: String,
api_key_factory: Arc<ApiKeyFactory>,
metrics_aggr: &Arc<Mutex<MetricsAggregator>>,
config: &Arc<Config>,
) -> Vec<MetricsFlusher> {
Expand All @@ -979,7 +996,7 @@ fn start_metrics_flushers(
};

let flusher_config = MetricsFlusherConfig {
api_key: resolved_api_key,
api_key_factory,
aggregator: Arc::clone(metrics_aggr),
metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"),
https_proxy: config.proxy_https.clone(),
Expand All @@ -1003,8 +1020,9 @@ fn start_metrics_flushers(

// Create a flusher for each endpoint URL and API key pair
for api_key in api_keys {
let additional_api_key_factory = Arc::new(ApiKeyFactory::new(api_key));
let additional_flusher_config = MetricsFlusherConfig {
api_key: api_key.clone(),
api_key_factory: additional_api_key_factory,
aggregator: metrics_aggr.clone(),
metrics_intake_url_prefix: metrics_intake_url.clone(),
https_proxy: config.proxy_https.clone(),
Expand All @@ -1020,7 +1038,7 @@ fn start_metrics_flushers(
#[allow(clippy::type_complexity)]
fn start_trace_agent(
config: &Arc<Config>,
resolved_api_key: String,
api_key_factory: Arc<ApiKeyFactory>,
tags_provider: &Arc<TagProvider>,
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
trace_aggregator: Arc<TokioMutex<trace_aggregator::TraceAggregator>>,
Expand All @@ -1035,7 +1053,7 @@ fn start_trace_agent(
// Stats
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::default()));
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
resolved_api_key.clone(),
api_key_factory.clone(),
stats_aggregator.clone(),
Arc::clone(config),
));
Expand All @@ -1059,13 +1077,13 @@ fn start_trace_agent(

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(obfuscation_config),
resolved_api_key: resolved_api_key.clone(),
api_key_factory: api_key_factory.clone(),
});

// Proxy
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default()));
let proxy_flusher = Arc::new(ProxyFlusher::new(
resolved_api_key,
api_key_factory.clone(),
Arc::clone(&proxy_aggregator),
Arc::clone(tags_provider),
Arc::clone(config),
Expand All @@ -1080,6 +1098,7 @@ fn start_trace_agent(
proxy_aggregator,
invocation_processor,
Arc::clone(tags_provider),
api_key_factory,
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();
Expand Down Expand Up @@ -1166,15 +1185,14 @@ fn start_otlp_agent(

fn start_api_runtime_proxy(
config: &Arc<Config>,
aws_config: &AwsConfig,
aws_config: Arc<AwsConfig>,
invocation_processor: &Arc<TokioMutex<InvocationProcessor>>,
) -> Option<CancellationToken> {
if !should_start_proxy(config, aws_config) {
if !should_start_proxy(config, Arc::clone(&aws_config)) {
debug!("Skipping API runtime proxy, no LWA proxy or datadog wrapper found");
return None;
}

let aws_config = aws_config.clone();
let invocation_processor = invocation_processor.clone();
interceptor::start(aws_config, invocation_processor).ok()
}
Loading
Loading