@@ -37,6 +37,7 @@ use bottlecap::{
37
37
} ,
38
38
logger,
39
39
logs:: { agent:: LogsAgent , flusher:: LogsFlusher } ,
40
+ metrics:: enhanced:: lambda:: Lambda as enhanced_metrics,
40
41
otlp:: { agent:: Agent as OtlpAgent , should_enable_otlp_agent} ,
41
42
proxy:: { interceptor, should_start_proxy} ,
42
43
secrets:: decrypt,
@@ -84,9 +85,7 @@ use std::{
84
85
collections:: { HashMap , hash_map} ,
85
86
env,
86
87
io:: { Error , Result } ,
87
- os:: unix:: process:: CommandExt ,
88
88
path:: Path ,
89
- process:: Command ,
90
89
sync:: { Arc , Mutex } ,
91
90
time:: { Duration , Instant } ,
92
91
} ;
@@ -402,14 +401,7 @@ fn load_configs(start_time: Instant) -> (AwsConfig, AwsCredentials, Arc<Config>)
402
401
let aws_credentials = AwsCredentials :: from_env ( ) ;
403
402
let lambda_directory: String =
404
403
env:: var ( "LAMBDA_TASK_ROOT" ) . unwrap_or_else ( |_| "/var/task" . to_string ( ) ) ;
405
- let config = match config:: get_config ( Path :: new ( & lambda_directory) ) {
406
- Ok ( config) => Arc :: new ( config) ,
407
- Err ( _e) => {
408
- let err = Command :: new ( "/opt/datadog-agent-go" ) . exec ( ) ;
409
- panic ! ( "Error starting the extension: {err:?}" ) ;
410
- }
411
- } ;
412
-
404
+ let config = Arc :: new ( config:: get_config ( Path :: new ( & lambda_directory) ) ) ;
413
405
( aws_config, aws_credentials, config)
414
406
}
415
407
@@ -508,12 +500,22 @@ async fn extension_loop_active(
508
500
. as_micros( )
509
501
. to_string( )
510
502
) ;
511
-
503
+ let metrics_intake_url = create_metrics_intake_url_prefix ( config ) ;
512
504
let metrics_flushers = Arc :: new ( TokioMutex :: new ( start_metrics_flushers (
513
505
Arc :: clone ( & api_key_factory) ,
514
506
& metrics_aggr,
507
+ & metrics_intake_url,
515
508
config,
516
509
) ) ) ;
510
+
511
+ // Create lambda enhanced metrics instance once
512
+ let lambda_enhanced_metrics =
513
+ enhanced_metrics:: new ( Arc :: clone ( & metrics_aggr) , Arc :: clone ( config) ) ;
514
+
515
+ // Send config issue metrics
516
+ let config_issues = config:: fallback ( config) ;
517
+ send_config_issue_metric ( & config_issues, & lambda_enhanced_metrics) ;
518
+
517
519
// Lifecycle Invocation Processor
518
520
let invocation_processor = Arc :: new ( TokioMutex :: new ( InvocationProcessor :: new (
519
521
Arc :: clone ( & tags_provider) ,
@@ -1006,33 +1008,33 @@ fn start_logs_agent(
1006
1008
( logs_agent_channel, logs_flusher)
1007
1009
}
1008
1010
1009
- fn start_metrics_flushers (
1010
- api_key_factory : Arc < ApiKeyFactory > ,
1011
- metrics_aggr : & Arc < Mutex < MetricsAggregator > > ,
1012
- config : & Arc < Config > ,
1013
- ) -> Vec < MetricsFlusher > {
1014
- let mut flushers = Vec :: new ( ) ;
1015
-
1016
- let metrics_intake_url = if !config. dd_url . is_empty ( ) {
1011
+ fn create_metrics_intake_url_prefix ( config : & Config ) -> MetricsIntakeUrlPrefix {
1012
+ if !config. dd_url . is_empty ( ) {
1017
1013
let dd_dd_url = DdDdUrl :: new ( config. dd_url . clone ( ) ) . expect ( "can't parse DD_DD_URL" ) ;
1018
-
1019
1014
let prefix_override = MetricsIntakeUrlPrefixOverride :: maybe_new ( None , Some ( dd_dd_url) ) ;
1020
- MetricsIntakeUrlPrefix :: new ( None , prefix_override)
1015
+ MetricsIntakeUrlPrefix :: new ( None , prefix_override) . expect ( "can't parse DD_DD_URL prefix" )
1021
1016
} else if !config. url . is_empty ( ) {
1022
1017
let dd_url = DdUrl :: new ( config. url . clone ( ) ) . expect ( "can't parse DD_URL" ) ;
1023
-
1024
1018
let prefix_override = MetricsIntakeUrlPrefixOverride :: maybe_new ( Some ( dd_url) , None ) ;
1025
- MetricsIntakeUrlPrefix :: new ( None , prefix_override)
1019
+ MetricsIntakeUrlPrefix :: new ( None , prefix_override) . expect ( "can't parse DD_URL prefix" )
1026
1020
} else {
1027
- // use site
1028
1021
let metrics_site = MetricsSite :: new ( config. site . clone ( ) ) . expect ( "can't parse site" ) ;
1029
- MetricsIntakeUrlPrefix :: new ( Some ( metrics_site) , None )
1030
- } ;
1022
+ MetricsIntakeUrlPrefix :: new ( Some ( metrics_site) , None ) . expect ( "can't parse site prefix" )
1023
+ }
1024
+ }
1025
+
1026
+ fn start_metrics_flushers (
1027
+ api_key_factory : Arc < ApiKeyFactory > ,
1028
+ metrics_aggr : & Arc < Mutex < MetricsAggregator > > ,
1029
+ metrics_intake_url : & MetricsIntakeUrlPrefix ,
1030
+ config : & Arc < Config > ,
1031
+ ) -> Vec < MetricsFlusher > {
1032
+ let mut flushers = Vec :: new ( ) ;
1031
1033
1032
1034
let flusher_config = MetricsFlusherConfig {
1033
1035
api_key_factory,
1034
1036
aggregator : Arc :: clone ( metrics_aggr) ,
1035
- metrics_intake_url_prefix : metrics_intake_url. expect ( "can't parse site or override" ) ,
1037
+ metrics_intake_url_prefix : metrics_intake_url. clone ( ) ,
1036
1038
https_proxy : config. proxy_https . clone ( ) ,
1037
1039
timeout : Duration :: from_secs ( config. flush_timeout ) ,
1038
1040
retry_strategy : DsdRetryStrategy :: Immediate ( 3 ) ,
@@ -1157,6 +1159,28 @@ fn start_trace_agent(
1157
1159
)
1158
1160
}
1159
1161
1162
+ /// Sends metrics indicating issue with configuration.
1163
+ ///
1164
+ /// # Arguments
1165
+ /// * `issue_reasons` - Vector of messages describing the issue with the configurations
1166
+ /// * `lambda_enhanced_metrics` - The lambda enhanced metrics instance
1167
+ fn send_config_issue_metric ( issue_reasons : & [ String ] , lambda_enhanced_metrics : & enhanced_metrics ) {
1168
+ if issue_reasons. is_empty ( ) {
1169
+ return ;
1170
+ }
1171
+ let now = std:: time:: UNIX_EPOCH
1172
+ . elapsed ( )
1173
+ . expect ( "can't poll clock" )
1174
+ . as_secs ( )
1175
+ . try_into ( )
1176
+ . unwrap_or_default ( ) ;
1177
+
1178
+ // Setup a separate metric for each config issue reason
1179
+ for issue_reason in issue_reasons {
1180
+ lambda_enhanced_metrics. set_config_load_issue_metric ( now, issue_reason) ;
1181
+ }
1182
+ }
1183
+
1160
1184
async fn start_dogstatsd ( metrics_aggr : & Arc < Mutex < MetricsAggregator > > ) -> CancellationToken {
1161
1185
let dogstatsd_config = DogStatsDConfig {
1162
1186
host : EXTENSION_HOST . to_string ( ) ,
0 commit comments