@@ -66,6 +66,7 @@ use datadog_trace_utils::send_data::SendData;
66
66
use decrypt:: resolve_secrets;
67
67
use dogstatsd:: {
68
68
aggregator:: Aggregator as MetricsAggregator ,
69
+ api_key:: ApiKeyFactory ,
69
70
constants:: CONTEXTS ,
70
71
datadog:: {
71
72
DdDdUrl , DdUrl , MetricsIntakeUrlPrefix , MetricsIntakeUrlPrefixOverride ,
@@ -88,7 +89,7 @@ use std::{
88
89
sync:: { Arc , Mutex } ,
89
90
time:: { Duration , Instant } ,
90
91
} ;
91
- use tokio:: { sync:: mpsc:: Sender , sync:: Mutex as TokioMutex , task:: JoinHandle } ;
92
+ use tokio:: { sync:: mpsc:: Sender , sync:: Mutex as TokioMutex , sync :: RwLock , task:: JoinHandle } ;
92
93
use tokio_util:: sync:: CancellationToken ;
93
94
use tracing:: { debug, error} ;
94
95
use tracing_subscriber:: EnvFilter ;
@@ -333,7 +334,7 @@ async fn register(client: &Client) -> Result<RegisterResponse> {
333
334
#[ tokio:: main]
334
335
async fn main ( ) -> Result < ( ) > {
335
336
let start_time = Instant :: now ( ) ;
336
- let ( aws_config, mut aws_credentials, config) = load_configs ( start_time) ;
337
+ let ( aws_config, aws_credentials, config) = load_configs ( start_time) ;
337
338
338
339
enable_logging_subsystem ( & config) ;
339
340
log_fips_status ( & aws_config. region ) ;
@@ -360,33 +361,27 @@ async fn main() -> Result<()> {
360
361
. await
361
362
. map_err ( |e| Error :: new ( std:: io:: ErrorKind :: InvalidData , e. to_string ( ) ) ) ?;
362
363
363
- if let Some ( resolved_api_key) =
364
- resolve_secrets ( Arc :: clone ( & config) , & aws_config, & mut aws_credentials) . await
364
+ let aws_config = Arc :: new ( aws_config) ;
365
+ let api_key_factory = create_api_key_factory ( & config, & aws_config, aws_credentials) ;
366
+
367
+ match extension_loop_active (
368
+ Arc :: clone ( & aws_config) ,
369
+ & config,
370
+ & client,
371
+ & r,
372
+ Arc :: clone ( & api_key_factory) ,
373
+ start_time,
374
+ )
375
+ . await
365
376
{
366
- match extension_loop_active (
367
- & aws_config,
368
- & config,
369
- & client,
370
- & r,
371
- resolved_api_key,
372
- start_time,
373
- )
374
- . await
375
- {
376
- Ok ( ( ) ) => {
377
- debug ! ( "Extension loop completed successfully" ) ;
378
- Ok ( ( ) )
379
- }
380
- Err ( e) => {
381
- error ! (
382
- "Extension loop failed: {e:?}, Calling /next without Datadog instrumentation"
383
- ) ;
384
- extension_loop_idle ( & client, & r) . await
385
- }
377
+ Ok ( ( ) ) => {
378
+ debug ! ( "Extension loop completed successfully" ) ;
379
+ Ok ( ( ) )
380
+ }
381
+ Err ( e) => {
382
+ error ! ( "Extension loop failed: {e:?}, Calling /next without Datadog instrumentation" ) ;
383
+ extension_loop_idle ( & client, & r) . await
386
384
}
387
- } else {
388
- error ! ( "Failed to resolve secrets, Datadog extension will be idle" ) ;
389
- extension_loop_idle ( & client, & r) . await
390
385
}
391
386
}
392
387
@@ -430,6 +425,28 @@ fn enable_logging_subsystem(config: &Arc<Config>) {
430
425
debug ! ( "Logging subsystem enabled" ) ;
431
426
}
432
427
428
+ fn create_api_key_factory (
429
+ config : & Arc < Config > ,
430
+ aws_config : & Arc < AwsConfig > ,
431
+ aws_credentials : AwsCredentials ,
432
+ ) -> Arc < ApiKeyFactory > {
433
+ let config = Arc :: clone ( config) ;
434
+ let aws_config = Arc :: clone ( aws_config) ;
435
+ let aws_credentials = Arc :: new ( RwLock :: new ( aws_credentials) ) ;
436
+
437
+ Arc :: new ( ApiKeyFactory :: new_from_resolver ( Arc :: new ( move || {
438
+ let config = Arc :: clone ( & config) ;
439
+ let aws_config = Arc :: clone ( & aws_config) ;
440
+ let aws_credentials = Arc :: clone ( & aws_credentials) ;
441
+
442
+ Box :: pin ( async move {
443
+ resolve_secrets ( config, aws_config, aws_credentials)
444
+ . await
445
+ . expect ( "Failed to resolve API key" )
446
+ } )
447
+ } ) ) )
448
+ }
449
+
433
450
async fn extension_loop_idle ( client : & Client , r : & RegisterResponse ) -> Result < ( ) > {
434
451
loop {
435
452
match next_event ( client, & r. extension_id ) . await {
@@ -446,11 +463,11 @@ async fn extension_loop_idle(client: &Client, r: &RegisterResponse) -> Result<()
446
463
447
464
#[ allow( clippy:: too_many_lines) ]
448
465
async fn extension_loop_active (
449
- aws_config : & AwsConfig ,
466
+ aws_config : Arc < AwsConfig > ,
450
467
config : & Arc < Config > ,
451
468
client : & Client ,
452
469
r : & RegisterResponse ,
453
- resolved_api_key : String ,
470
+ api_key_factory : Arc < ApiKeyFactory > ,
454
471
start_time : Instant ,
455
472
) -> Result < ( ) > {
456
473
let mut event_bus = EventBus :: run ( ) ;
@@ -460,11 +477,11 @@ async fn extension_loop_active(
460
477
. as_ref ( )
461
478
. unwrap_or ( & "none" . to_string ( ) )
462
479
. to_string ( ) ;
463
- let tags_provider = setup_tag_provider ( aws_config, config, & account_id) ;
480
+ let tags_provider = setup_tag_provider ( & Arc :: clone ( & aws_config) , config, & account_id) ;
464
481
465
482
let ( logs_agent_channel, logs_flusher) = start_logs_agent (
466
483
config,
467
- resolved_api_key . clone ( ) ,
484
+ Arc :: clone ( & api_key_factory ) ,
468
485
& tags_provider,
469
486
event_bus. get_sender_copy ( ) ,
470
487
) ;
@@ -478,15 +495,15 @@ async fn extension_loop_active(
478
495
) ) ;
479
496
480
497
let metrics_flushers = Arc :: new ( TokioMutex :: new ( start_metrics_flushers (
481
- resolved_api_key . clone ( ) ,
498
+ Arc :: clone ( & api_key_factory ) ,
482
499
& metrics_aggr,
483
500
config,
484
501
) ) ) ;
485
502
// Lifecycle Invocation Processor
486
503
let invocation_processor = Arc :: new ( TokioMutex :: new ( InvocationProcessor :: new (
487
504
Arc :: clone ( & tags_provider) ,
488
505
Arc :: clone ( config) ,
489
- aws_config,
506
+ Arc :: clone ( & aws_config) ,
490
507
Arc :: clone ( & metrics_aggr) ,
491
508
) ) ) ;
492
509
@@ -500,7 +517,7 @@ async fn extension_loop_active(
500
517
trace_agent_shutdown_token,
501
518
) = start_trace_agent (
502
519
config,
503
- resolved_api_key . clone ( ) ,
520
+ Arc :: clone ( & api_key_factory ) ,
504
521
& tags_provider,
505
522
Arc :: clone ( & invocation_processor) ,
506
523
Arc :: clone ( & trace_aggregator) ,
@@ -919,7 +936,7 @@ async fn handle_next_invocation(
919
936
}
920
937
921
938
fn setup_tag_provider (
922
- aws_config : & AwsConfig ,
939
+ aws_config : & Arc < AwsConfig > ,
923
940
config : & Arc < Config > ,
924
941
account_id : & str ,
925
942
) -> Arc < TagProvider > {
@@ -938,14 +955,14 @@ fn setup_tag_provider(
938
955
939
956
fn start_logs_agent (
940
957
config : & Arc < Config > ,
941
- resolved_api_key : String ,
958
+ api_key_factory : Arc < ApiKeyFactory > ,
942
959
tags_provider : & Arc < TagProvider > ,
943
960
event_bus : Sender < Event > ,
944
961
) -> ( Sender < TelemetryEvent > , LogsFlusher ) {
945
962
let mut logs_agent = LogsAgent :: new ( Arc :: clone ( tags_provider) , Arc :: clone ( config) , event_bus) ;
946
963
let logs_agent_channel = logs_agent. get_sender_copy ( ) ;
947
964
let logs_flusher = LogsFlusher :: new (
948
- resolved_api_key ,
965
+ api_key_factory ,
949
966
Arc :: clone ( & logs_agent. aggregator ) ,
950
967
config. clone ( ) ,
951
968
) ;
@@ -956,7 +973,7 @@ fn start_logs_agent(
956
973
}
957
974
958
975
fn start_metrics_flushers (
959
- resolved_api_key : String ,
976
+ api_key_factory : Arc < ApiKeyFactory > ,
960
977
metrics_aggr : & Arc < Mutex < MetricsAggregator > > ,
961
978
config : & Arc < Config > ,
962
979
) -> Vec < MetricsFlusher > {
@@ -979,7 +996,7 @@ fn start_metrics_flushers(
979
996
} ;
980
997
981
998
let flusher_config = MetricsFlusherConfig {
982
- api_key : resolved_api_key ,
999
+ api_key_factory ,
983
1000
aggregator : Arc :: clone ( metrics_aggr) ,
984
1001
metrics_intake_url_prefix : metrics_intake_url. expect ( "can't parse site or override" ) ,
985
1002
https_proxy : config. proxy_https . clone ( ) ,
@@ -1003,8 +1020,9 @@ fn start_metrics_flushers(
1003
1020
1004
1021
// Create a flusher for each endpoint URL and API key pair
1005
1022
for api_key in api_keys {
1023
+ let additional_api_key_factory = Arc :: new ( ApiKeyFactory :: new ( api_key) ) ;
1006
1024
let additional_flusher_config = MetricsFlusherConfig {
1007
- api_key : api_key . clone ( ) ,
1025
+ api_key_factory : additional_api_key_factory ,
1008
1026
aggregator : metrics_aggr. clone ( ) ,
1009
1027
metrics_intake_url_prefix : metrics_intake_url. clone ( ) ,
1010
1028
https_proxy : config. proxy_https . clone ( ) ,
@@ -1020,7 +1038,7 @@ fn start_metrics_flushers(
1020
1038
#[ allow( clippy:: type_complexity) ]
1021
1039
fn start_trace_agent (
1022
1040
config : & Arc < Config > ,
1023
- resolved_api_key : String ,
1041
+ api_key_factory : Arc < ApiKeyFactory > ,
1024
1042
tags_provider : & Arc < TagProvider > ,
1025
1043
invocation_processor : Arc < TokioMutex < InvocationProcessor > > ,
1026
1044
trace_aggregator : Arc < TokioMutex < trace_aggregator:: TraceAggregator > > ,
@@ -1035,7 +1053,7 @@ fn start_trace_agent(
1035
1053
// Stats
1036
1054
let stats_aggregator = Arc :: new ( TokioMutex :: new ( StatsAggregator :: default ( ) ) ) ;
1037
1055
let stats_flusher = Arc :: new ( stats_flusher:: ServerlessStatsFlusher :: new (
1038
- resolved_api_key . clone ( ) ,
1056
+ api_key_factory . clone ( ) ,
1039
1057
stats_aggregator. clone ( ) ,
1040
1058
Arc :: clone ( config) ,
1041
1059
) ) ;
@@ -1059,13 +1077,13 @@ fn start_trace_agent(
1059
1077
1060
1078
let trace_processor = Arc :: new ( trace_processor:: ServerlessTraceProcessor {
1061
1079
obfuscation_config : Arc :: new ( obfuscation_config) ,
1062
- resolved_api_key : resolved_api_key . clone ( ) ,
1080
+ api_key_factory : api_key_factory . clone ( ) ,
1063
1081
} ) ;
1064
1082
1065
1083
// Proxy
1066
1084
let proxy_aggregator = Arc :: new ( TokioMutex :: new ( proxy_aggregator:: Aggregator :: default ( ) ) ) ;
1067
1085
let proxy_flusher = Arc :: new ( ProxyFlusher :: new (
1068
- resolved_api_key ,
1086
+ api_key_factory . clone ( ) ,
1069
1087
Arc :: clone ( & proxy_aggregator) ,
1070
1088
Arc :: clone ( tags_provider) ,
1071
1089
Arc :: clone ( config) ,
@@ -1080,6 +1098,7 @@ fn start_trace_agent(
1080
1098
proxy_aggregator,
1081
1099
invocation_processor,
1082
1100
Arc :: clone ( tags_provider) ,
1101
+ api_key_factory,
1083
1102
) ;
1084
1103
let trace_agent_channel = trace_agent. get_sender_copy ( ) ;
1085
1104
let shutdown_token = trace_agent. shutdown_token ( ) ;
@@ -1166,15 +1185,14 @@ fn start_otlp_agent(
1166
1185
1167
1186
fn start_api_runtime_proxy (
1168
1187
config : & Arc < Config > ,
1169
- aws_config : & AwsConfig ,
1188
+ aws_config : Arc < AwsConfig > ,
1170
1189
invocation_processor : & Arc < TokioMutex < InvocationProcessor > > ,
1171
1190
) -> Option < CancellationToken > {
1172
- if !should_start_proxy ( config, aws_config) {
1191
+ if !should_start_proxy ( config, Arc :: clone ( & aws_config) ) {
1173
1192
debug ! ( "Skipping API runtime proxy, no LWA proxy or datadog wrapper found" ) ;
1174
1193
return None ;
1175
1194
}
1176
1195
1177
- let aws_config = aws_config. clone ( ) ;
1178
1196
let invocation_processor = invocation_processor. clone ( ) ;
1179
1197
interceptor:: start ( aws_config, invocation_processor) . ok ( )
1180
1198
}
0 commit comments