@@ -37,6 +37,7 @@ use bottlecap::{
37
37
} ,
38
38
logger,
39
39
logs:: { agent:: LogsAgent , flusher:: LogsFlusher } ,
40
+ metrics:: enhanced:: lambda:: Lambda as enhanced_metrics,
40
41
otlp:: { agent:: Agent as OtlpAgent , should_enable_otlp_agent} ,
41
42
proxy:: { interceptor, should_start_proxy} ,
42
43
secrets:: decrypt,
@@ -85,9 +86,7 @@ use std::{
85
86
collections:: { HashMap , hash_map} ,
86
87
env,
87
88
io:: { Error , Result } ,
88
- os:: unix:: process:: CommandExt ,
89
89
path:: Path ,
90
- process:: Command ,
91
90
sync:: Arc ,
92
91
time:: { Duration , Instant } ,
93
92
} ;
@@ -403,14 +402,7 @@ fn load_configs(start_time: Instant) -> (AwsConfig, AwsCredentials, Arc<Config>)
403
402
let aws_credentials = AwsCredentials :: from_env ( ) ;
404
403
let lambda_directory: String =
405
404
env:: var ( "LAMBDA_TASK_ROOT" ) . unwrap_or_else ( |_| "/var/task" . to_string ( ) ) ;
406
- let config = match config:: get_config ( Path :: new ( & lambda_directory) ) {
407
- Ok ( config) => Arc :: new ( config) ,
408
- Err ( _e) => {
409
- let err = Command :: new ( "/opt/datadog-agent-go" ) . exec ( ) ;
410
- panic ! ( "Error starting the extension: {err:?}" ) ;
411
- }
412
- } ;
413
-
405
+ let config = Arc :: new ( config:: get_config ( Path :: new ( & lambda_directory) ) ) ;
414
406
( aws_config, aws_credentials, config)
415
407
}
416
408
@@ -509,11 +501,22 @@ async fn extension_loop_active(
509
501
) ;
510
502
start_dogstatsd_aggregator ( metrics_aggr_service) ;
511
503
504
+ let metrics_intake_url = create_metrics_intake_url_prefix ( config) ;
512
505
let metrics_flushers = Arc :: new ( TokioMutex :: new ( start_metrics_flushers (
513
506
Arc :: clone ( & api_key_factory) ,
514
507
& metrics_aggr_handle,
508
+ & metrics_intake_url,
515
509
config,
516
510
) ) ) ;
511
+
512
+ // Create lambda enhanced metrics instance once
513
+ let lambda_enhanced_metrics =
514
+ enhanced_metrics:: new ( metrics_aggr_handle. clone ( ) , Arc :: clone ( config) ) ;
515
+
516
+ // Send config issue metrics
517
+ let config_issues = config:: fallback ( config) ;
518
+ send_config_issue_metric ( & config_issues, & lambda_enhanced_metrics) ;
519
+
517
520
// Lifecycle Invocation Processor
518
521
let invocation_processor = Arc :: new ( TokioMutex :: new ( InvocationProcessor :: new (
519
522
Arc :: clone ( & tags_provider) ,
@@ -1012,33 +1015,33 @@ fn start_logs_agent(
1012
1015
( logs_agent_channel, logs_flusher)
1013
1016
}
1014
1017
1015
- fn start_metrics_flushers (
1016
- api_key_factory : Arc < ApiKeyFactory > ,
1017
- metrics_aggr_handle : & MetricsAggregatorHandle ,
1018
- config : & Arc < Config > ,
1019
- ) -> Vec < MetricsFlusher > {
1020
- let mut flushers = Vec :: new ( ) ;
1021
-
1022
- let metrics_intake_url = if !config. dd_url . is_empty ( ) {
1018
+ fn create_metrics_intake_url_prefix ( config : & Config ) -> MetricsIntakeUrlPrefix {
1019
+ if !config. dd_url . is_empty ( ) {
1023
1020
let dd_dd_url = DdDdUrl :: new ( config. dd_url . clone ( ) ) . expect ( "can't parse DD_DD_URL" ) ;
1024
-
1025
1021
let prefix_override = MetricsIntakeUrlPrefixOverride :: maybe_new ( None , Some ( dd_dd_url) ) ;
1026
- MetricsIntakeUrlPrefix :: new ( None , prefix_override)
1022
+ MetricsIntakeUrlPrefix :: new ( None , prefix_override) . expect ( "can't parse DD_DD_URL prefix" )
1027
1023
} else if !config. url . is_empty ( ) {
1028
1024
let dd_url = DdUrl :: new ( config. url . clone ( ) ) . expect ( "can't parse DD_URL" ) ;
1029
-
1030
1025
let prefix_override = MetricsIntakeUrlPrefixOverride :: maybe_new ( Some ( dd_url) , None ) ;
1031
- MetricsIntakeUrlPrefix :: new ( None , prefix_override)
1026
+ MetricsIntakeUrlPrefix :: new ( None , prefix_override) . expect ( "can't parse DD_URL prefix" )
1032
1027
} else {
1033
- // use site
1034
1028
let metrics_site = MetricsSite :: new ( config. site . clone ( ) ) . expect ( "can't parse site" ) ;
1035
- MetricsIntakeUrlPrefix :: new ( Some ( metrics_site) , None )
1036
- } ;
1029
+ MetricsIntakeUrlPrefix :: new ( Some ( metrics_site) , None ) . expect ( "can't parse site prefix" )
1030
+ }
1031
+ }
1032
+
1033
+ fn start_metrics_flushers (
1034
+ api_key_factory : Arc < ApiKeyFactory > ,
1035
+ metrics_aggr_handle : & MetricsAggregatorHandle ,
1036
+ metrics_intake_url : & MetricsIntakeUrlPrefix ,
1037
+ config : & Arc < Config > ,
1038
+ ) -> Vec < MetricsFlusher > {
1039
+ let mut flushers = Vec :: new ( ) ;
1037
1040
1038
1041
let flusher_config = MetricsFlusherConfig {
1039
1042
api_key_factory,
1040
1043
aggregator_handle : metrics_aggr_handle. clone ( ) ,
1041
- metrics_intake_url_prefix : metrics_intake_url. expect ( "can't parse site or override" ) ,
1044
+ metrics_intake_url_prefix : metrics_intake_url. clone ( ) ,
1042
1045
https_proxy : config. proxy_https . clone ( ) ,
1043
1046
timeout : Duration :: from_secs ( config. flush_timeout ) ,
1044
1047
retry_strategy : DsdRetryStrategy :: Immediate ( 3 ) ,
@@ -1163,6 +1166,28 @@ fn start_trace_agent(
1163
1166
)
1164
1167
}
1165
1168
1169
+ /// Sends metrics indicating issue with configuration.
1170
+ ///
1171
+ /// # Arguments
1172
+ /// * `issue_reasons` - Vector of messages describing the issue with the configurations
1173
+ /// * `lambda_enhanced_metrics` - The lambda enhanced metrics instance
1174
+ fn send_config_issue_metric ( issue_reasons : & [ String ] , lambda_enhanced_metrics : & enhanced_metrics ) {
1175
+ if issue_reasons. is_empty ( ) {
1176
+ return ;
1177
+ }
1178
+ let now = std:: time:: UNIX_EPOCH
1179
+ . elapsed ( )
1180
+ . expect ( "can't poll clock" )
1181
+ . as_secs ( )
1182
+ . try_into ( )
1183
+ . unwrap_or_default ( ) ;
1184
+
1185
+ // Setup a separate metric for each config issue reason
1186
+ for issue_reason in issue_reasons {
1187
+ lambda_enhanced_metrics. set_config_load_issue_metric ( now, issue_reason) ;
1188
+ }
1189
+ }
1190
+
1166
1191
fn start_dogstatsd_aggregator ( aggr_service : MetricsAggregatorService ) {
1167
1192
tokio:: spawn ( async move {
1168
1193
aggr_service. run ( ) . await ;
0 commit comments