Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to register ADP as a remote agent to the Datadog Agent #377

Merged
merged 32 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ chrono-tz = { version = "0.10", default-features = false }
iana-time-zone = { version = "0.1", default-features = false }
backon = { version = "1", default-features = false }
http-serde-ext = { version = "1", default-features = false }
uuid = { version = "1.11.0", default-features = false }

[profile.release]
lto = "thin"
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ utf16_iter,https://github.com/hsivonen/utf16_iter,Apache-2.0 OR MIT,Henri Sivone
utf8-width,https://github.com/magiclen/utf8-width,MIT,Magic Len <[email protected]>
utf8_iter,https://github.com/hsivonen/utf8_iter,Apache-2.0 OR MIT,Henri Sivonen <[email protected]>
utf8parse,https://github.com/alacritty/vte,Apache-2.0 OR MIT,"Joe Wilm <[email protected]>, Christian Duerr <[email protected]>"
uuid,https://github.com/uuid-rs/uuid,Apache-2.0 OR MIT,"Ashley Mannix<[email protected]>, Dylan DPC<[email protected]>, Hunar Roop Kahlon<[email protected]>"
valuable,https://github.com/tokio-rs/valuable,MIT,The valuable Authors
walkdir,https://github.com/BurntSushi/walkdir,Unlicense OR MIT,Andrew Gallant <[email protected]>
want,https://github.com/seanmonstar/want,MIT,Sean McArthur <[email protected]>
Expand Down
38 changes: 30 additions & 8 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use std::{
use memory_accounting::ComponentRegistry;
use saluki_app::{api::APIBuilder, prelude::*};
use saluki_components::{
destinations::{DatadogEventsServiceChecksConfiguration, DatadogMetricsConfiguration, PrometheusConfiguration},
destinations::{
DatadogEventsServiceChecksConfiguration, DatadogMetricsConfiguration, DatadogStatusFlareConfiguration,
PrometheusConfiguration,
},
sources::{DogStatsDConfiguration, InternalMetricsConfiguration},
transforms::{
AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration,
Expand Down Expand Up @@ -93,9 +96,11 @@ async fn run(started: Instant) -> Result<(), GenericError> {
let env_provider =
ADPEnvironmentProvider::from_configuration(&configuration, &component_registry, &health_registry).await?;

let status_configuration = DatadogStatusFlareConfiguration::from_configuration(&configuration).await?;
tobz marked this conversation as resolved.
Show resolved Hide resolved

// Create a simple pipeline that runs a DogStatsD source, an aggregation transform to bucket into 10 second windows,
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
let blueprint = create_topology(&configuration, env_provider, &component_registry)?;
let blueprint = create_topology(&configuration, env_provider, &component_registry, status_configuration)?;

// Build our administrative API server.
let primary_api_listen_address = configuration
Expand Down Expand Up @@ -158,6 +163,7 @@ async fn run(started: Instant) -> Result<(), GenericError> {

fn create_topology(
configuration: &GenericConfiguration, env_provider: ADPEnvironmentProvider, component_registry: &ComponentRegistry,
status_configuration: DatadogStatusFlareConfiguration,
) -> Result<TopologyBlueprint, GenericError> {
// Create a simple pipeline that runs a DogStatsD source, an aggregation transform to bucket into 10 second windows,
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
Expand Down Expand Up @@ -192,8 +198,12 @@ fn create_topology(
let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Events/Service Checks destination.")?;

let int_metrics_config = InternalMetricsConfiguration;
let int_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();

let topology_registry = component_registry.get_or_create("topology");
let mut blueprint = TopologyBlueprint::from_component_registry(topology_registry);

blueprint
.add_source("dsd_in", dsd_config)?
.add_transform("dsd_agg", dsd_agg_config)?
Expand All @@ -205,17 +215,29 @@ fn create_topology(
.connect_component("dd_metrics_out", ["enrich"])?
.connect_component("dd_events_sc_out", ["dsd_in.events", "dsd_in.service_checks"])?;

// Insert a Prometheus scrape destination if we've been instructed to enable internal telemetry.
if configuration.get_typed_or_default::<bool>("telemetry_enabled") {
let int_metrics_config = InternalMetricsConfiguration;
let int_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();
let prometheus_config = PrometheusConfiguration::from_configuration(configuration)?;
let use_prometheus = configuration.get_typed_or_default::<bool>("telemetry_enabled");
let use_status_flare_component = !configuration.get_typed_or_default::<bool>("adp_agent_no_op");
tobz marked this conversation as resolved.
Show resolved Hide resolved

// Insert internal metrics source only if internal telemetry or status and flare component is enabled.
if use_prometheus || use_status_flare_component {
blueprint
.add_source("internal_metrics_in", int_metrics_config)?
.add_transform("internal_metrics_remap", int_metrics_remap_config)?
.connect_component("internal_metrics_remap", ["internal_metrics_in"])?;
}

// Insert a Datadog Status Flare destination if we've been instructed to not do a no-op.
if use_status_flare_component {
blueprint
.add_destination("dd_status_flare_out", status_configuration)?
.connect_component("dd_status_flare_out", ["internal_metrics_remap"])?;
}

// Insert a Prometheus scrape destination if we've been instructed to enable internal telemetry.
if use_prometheus {
let prometheus_config = PrometheusConfiguration::from_configuration(configuration)?;
blueprint
.add_destination("internal_metrics_out", prometheus_config)?
.connect_component("internal_metrics_remap", ["internal_metrics_in"])?
.connect_component("internal_metrics_out", ["internal_metrics_remap"])?;
}

Expand Down
10 changes: 2 additions & 8 deletions lib/datadog-protos/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,15 @@ fn main() {

// Handle code generation for gRPC service definitions.
tonic_build::configure()
.build_server(false)
.build_server(true)
.include_file("api.mod.rs")
.compile_protos(
&[
"proto/datadog/api/v1/api.proto",
"proto/datadog/workloadmeta/workloadmeta.proto",
"proto/datadog/remoteagent/remoteagent.proto",
],
&["proto"],
)
.expect("failed to build gRPC service definitions for DCA");

// Handle code generation for gRPC service definitions.
tonic_build::configure()
.build_server(true)
.include_file("remoteagent.mod.rs")
.compile_protos(&["proto/datadog/remoteagent/remoteagent.proto"], &["proto"])
.expect("failed to build gRPC service definitions for RemoteAgent")
}
12 changes: 3 additions & 9 deletions lib/datadog-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ mod agent_include {
include!(concat!(env!("OUT_DIR"), "/api.mod.rs"));
}

mod remoteagent_include {
include!(concat!(env!("OUT_DIR"), "/remoteagent.mod.rs"));
}

/// Metrics-related definitions.
pub mod metrics {
pub use super::include::dd_metric::metric_payload::*;
Expand All @@ -33,11 +29,9 @@ pub mod traces {
pub mod agent {
pub use super::agent_include::datadog::api::v1::agent_client::AgentClient;
pub use super::agent_include::datadog::api::v1::agent_secure_client::AgentSecureClient;
pub use super::agent_include::datadog::api::v1::remote_agent_server::RemoteAgent;
pub use super::agent_include::datadog::api::v1::remote_agent_server::RemoteAgentServer;
pub use super::agent_include::datadog::model::v1::*;
pub use super::agent_include::datadog::remoteagent::*;
pub use super::agent_include::datadog::workloadmeta::*;
}

// RemoteAgent definitions.
pub mod remoteagent {
pub use super::remoteagent_include::datadog::remoteagent::*;
}
3 changes: 3 additions & 0 deletions lib/saluki-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ paste = { workspace = true }
pin-project = { workspace = true }
protobuf = { workspace = true }
quanta = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
rustls = { workspace = true }
saluki-config = { workspace = true }
Expand All @@ -54,6 +55,8 @@ snafu = { workspace = true }
stringtheory = { workspace = true }
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] }
tokio-util = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true, features = ["retry", "timeout", "util"] }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true, features = ["std", "v7"] }
3 changes: 3 additions & 0 deletions lib/saluki-components/src/destinations/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ pub use self::events_service_checks::DatadogEventsServiceChecksConfiguration;

mod metrics;
pub use self::metrics::DatadogMetricsConfiguration;

mod status_flare;
pub use self::status_flare::DatadogStatusFlareConfiguration;
152 changes: 152 additions & 0 deletions lib/saluki-components/src/destinations/datadog/status_flare/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::num::NonZeroUsize;
use std::time::Duration;

use async_trait::async_trait;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use saluki_config::GenericConfiguration;
use saluki_core::components::{
destinations::{Destination, DestinationBuilder, DestinationContext},
ComponentContext,
};
use saluki_env::helpers::remote_agent::RemoteAgentClient;
use saluki_error::GenericError;
use saluki_event::DataType;
use tokio::select;
use tokio::time::{interval, MissedTickBehavior};
use tracing::debug;
use uuid::Uuid;

const DEFAULT_API_LISTEN_PORT: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(5102) };

/// Datadog Status and Flare Destination
///
/// Registers ADP as a remote agent to the Core Agent.
///
/// ## Missing
///
/// - grpc server to respond to Core Agent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say this isn't missing anymore. :)

pub struct DatadogStatusFlareConfiguration {
id: String,

display_name: String,

api_listen_port: NonZeroUsize,

client: RemoteAgentClient,
}

impl DatadogStatusFlareConfiguration {
/// Creates a new `DatadogStatusFlareConfiguration` from the given configuration.
pub async fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
let app_details = saluki_metadata::get_app_details();
let formatted_full_name = app_details
.full_name()
.replace(" ", "-")
.replace("_", "-")
.to_lowercase();
let api_listen_port = config
.try_get_typed::<NonZeroUsize>("remote_agent_api_listen_port")?
.unwrap_or(DEFAULT_API_LISTEN_PORT);
let client = RemoteAgentClient::from_configuration(config).await?;

Ok(Self {
id: format!("{}-{}", formatted_full_name, Uuid::now_v7()),
display_name: formatted_full_name,
api_listen_port,
client,
})
}
}

#[async_trait]
impl DestinationBuilder for DatadogStatusFlareConfiguration {
fn input_data_type(&self) -> DataType {
DataType::Metric | DataType::EventD | DataType::ServiceCheck
}

async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
Ok(Box::new(DatadogStatusFlare {
id: self.id.clone(),
display_name: self.display_name.clone(),
api_listen_port: self.api_listen_port,
client: self.client.clone(),
}))
}
}

impl MemoryBounds for DatadogStatusFlareConfiguration {
fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
builder
.minimum()
// Capture the size of the heap allocation when the component is built.
.with_single_value::<DatadogStatusFlare>();
}
}

pub struct DatadogStatusFlare {
id: String,

display_name: String,

api_listen_port: NonZeroUsize,

client: RemoteAgentClient,
}

#[async_trait]
impl Destination for DatadogStatusFlare {
async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
let Self {
id,
display_name,
api_listen_port,
mut client,
} = *self;

let api_endpoint = format!("127.0.0.1:{}", api_listen_port);
let auth_token: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(64)
.map(char::from)
.collect();

let mut register_agent = interval(Duration::from_secs(10));
register_agent.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut health = context.take_health_handle();
health.mark_ready();
debug!("Datadog Status and Flare destination started.");

loop {
select! {
_ = health.live() => continue,

result = context.events().next() => match result {
Some(_events) => {
},
None => break,
},

// Time to (re)register with the Core Agent.
//
// TODO: Consider spawning the registration as a task so that the component can keep polling and not slow down the accepting of events and responding of health checks.
_ = register_agent.tick() => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This component is only built if adp_standalone_mode == false, but this call will keep failing if

 remote_agent_registry:
   enabled: true

is not set on the datadog-agent side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷🏻

I think that's just something we'll have to deal with until we make it the default to enable the remote agent registry. We don't get any visible errors, right, since it's just debug logging if the call fails? Or does it have any other user-visible impact in ADP when the remote agent registry is disabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it will just spam the debug logs if it fails.

match client.register_remote_agent_request(&id, &display_name, &api_endpoint, &auth_token).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely as a sidenote that we should add a TODO for, but isn't blocking for this PR: this call might take a while if the Agent is temporarily down or slow to respond... which would in turn slow down the accepting of events/responding to health check requests.

I don't know the exact code I would want to use to do it, but we would likely want to consider figuring out how we could spawn this call as a background task so that the component can keep polling, but limit ourselves to one in-flight request at a time.

Typing this all up also reminds me that we don't have any request timeout configuration in RemoteAgentClient. 🤔

Ok(resp) => {
let new_refresh_interval = resp.into_inner().recommended_refresh_interval_secs;
register_agent.reset_after(Duration::from_secs(new_refresh_interval as u64));
debug!("Refreshed registration with Core Agent");
}
Err(_) => {
debug!("Failed to refresh registration with Core Agent.");
}
}
}
}
}

debug!("Datadog Status Flare destination stopped.");
Ok(())
}
}
4 changes: 3 additions & 1 deletion lib/saluki-components/src/destinations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ mod blackhole;
pub use self::blackhole::BlackholeConfiguration;

mod datadog;
pub use self::datadog::{DatadogEventsServiceChecksConfiguration, DatadogMetricsConfiguration};
pub use self::datadog::{
DatadogEventsServiceChecksConfiguration, DatadogMetricsConfiguration, DatadogStatusFlareConfiguration,
};
mod prometheus;
pub use self::prometheus::PrometheusConfiguration;
Loading
Loading