Skip to content

Commit a603e60

Browse files
committed
Datadog intake blackhole
This PR introduces support for Datadog intake v2 in a new blackhole, the goal being to accumulate trace-agent metrics that are only transmitted to dogstatsd and then out to the intake. The construction method here is intended to make it easy to add support for new intake versions. Signed-off-by: Brian L. Troutwine <[email protected]>
1 parent 52e7181 commit a603e60

File tree

14 files changed

+777
-9
lines changed

14 files changed

+777
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
## Added
1414
- Stable throttle now has a 'timeout' configuration parameter to model IO done
1515
with timeout.
16+
- Added a 'Datadog Intake' blackhole, supporting only the v2 metrics protobuf
17+
protocol for now. This captures metrics submitted into capture files.
1618

1719
## [0.28.0]
1820
## Added

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ metrics-exporter-prometheus = { version = "0.15", default-features = false, feat
2121
"http-listener",
2222
"uds-listener",
2323
] }
24-
prost = "0.13"
24+
prost = { version = "0.13" }
25+
prost-build = { version = "0.13" }
2526
rand = { version = "0.9", default-features = false }
2627
rustc-hash = { version = "1.1" }
2728
serde = { version = "1.0", features = ["std", "derive"] }
@@ -81,9 +82,12 @@ missing_docs = "deny"
8182
warnings = "deny"
8283

8384
[profile.release]
84-
lto = true # Optimize our binary at link stage.
85-
codegen-units = 1 # Increases compile time but improves optimization alternatives.
86-
opt-level = 3 # Optimize with 'all' optimization flipped on. May produce larger binaries than 's' or 'z'.
85+
# lto = true # Optimize our binary at link stage.
86+
# codegen-units = 1 # Increases compile time but improves optimization alternatives.
87+
# opt-level = 3 # Optimize with 'all' optimization flipped on. May produce larger binaries than 's' or 'z'.
88+
lto = false # Optimize our binary at link stage.
89+
codegen-units = 8 # Increases compile time but improves optimization alternatives.
90+
opt-level = 's' # Optimize with 'all' optimization flipped on. May produce larger binaries than 's' or 'z'.
8791
panic = "abort"
8892
debug = 2 # keep symbols in release too
8993
split-debuginfo = "unpacked"

buf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ lint:
88
- COMMENTS
99
ignore:
1010
- integration
11+
- lading/proto/agent_payload.proto

lading/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@ keywords = ["random_test", "generator"]
1414
categories = ["development-tools::profiling"]
1515
description = "A tool for load testing daemons."
1616

17-
[package.metadata.cargo-machete]
18-
ignored = ["prost"]
19-
2017
[dependencies]
2118
lading-capture = { version = "0.2", path = "../lading_capture" }
2219
lading-payload = { version = "0.1", path = "../lading_payload" }
@@ -106,6 +103,9 @@ tempfile = "3.23"
106103
proptest = { workspace = true }
107104
tower-test = "0.4"
108105

106+
[build-dependencies]
107+
prost-build = { workspace = true }
108+
109109
[features]
110110
default = []
111111
logrotate_fs = ["fuser"]

lading/build.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//! Build script for `lading` crate.
2+
3+
fn main() -> std::io::Result<()> {
4+
println!("cargo:rerun-if-changed=proto/");
5+
6+
let includes = ["proto/"];
7+
prost_build::Config::new()
8+
.out_dir("src/proto/")
9+
.protoc_arg("--experimental_allow_proto3_optional")
10+
.compile_protos(&["proto/agent_payload.proto"], &includes)?;
11+
12+
Ok(())
13+
}

lading/proto/agent_payload.proto

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Vendored from
2+
// https://github.com/DataDog/agent-payload/blob/master/proto/metrics/agent_payload.proto
3+
// SHA 9e27b06757db73581484382d8c0772b3e7f1e009.
4+
5+
syntax = "proto3";
6+
7+
package datadog.agentpayload;
8+
9+
message CommonMetadata {
10+
string agent_version = 1;
11+
string timezone = 2;
12+
double current_epoch = 3;
13+
string internal_ip = 4;
14+
string public_ip = 5;
15+
string api_key = 6;
16+
}
17+
18+
message Origin {
19+
reserved 1,2,3;
20+
uint32 origin_product = 4;
21+
uint32 origin_category = 5;
22+
uint32 origin_service = 6;
23+
}
24+
25+
// Metadata is used in both the MetricSeries and Sketch messages defined below.
26+
message Metadata {
27+
Origin origin = 1;
28+
}
29+
30+
message MetricPayload {
31+
enum MetricType {
32+
UNSPECIFIED = 0;
33+
COUNT = 1;
34+
RATE = 2;
35+
GAUGE = 3;
36+
}
37+
38+
message MetricPoint {
39+
// metric value
40+
double value = 1;
41+
// timestamp for this value in seconds since the UNIX epoch
42+
int64 timestamp = 2;
43+
}
44+
45+
message Resource {
46+
string type = 1;
47+
string name = 2;
48+
}
49+
50+
message MetricSeries {
51+
// Resources this series applies to; include at least
52+
// { type="host", name=<hostname> }
53+
repeated Resource resources = 1;
54+
// metric name
55+
string metric = 2;
56+
// tags for this metric
57+
repeated string tags = 3;
58+
// data points for this metric
59+
repeated MetricPoint points = 4;
60+
// type of metric
61+
MetricType type = 5;
62+
// metric unit name
63+
string unit = 6;
64+
// source of this metric (check name, etc.)
65+
string source_type_name = 7;
66+
// interval, in seconds, between samples of this metric
67+
int64 interval = 8;
68+
// Metrics origin metadata
69+
Metadata metadata = 9;
70+
}
71+
repeated MetricSeries series = 1;
72+
}
73+
74+
message EventsPayload {
75+
message Event {
76+
string title = 1;
77+
string text = 2;
78+
int64 ts = 3;
79+
string priority = 4;
80+
string host = 5;
81+
repeated string tags = 6;
82+
string alert_type = 7;
83+
string aggregation_key = 8;
84+
string source_type_name = 9;
85+
}
86+
repeated Event events = 1;
87+
CommonMetadata metadata = 2;
88+
}
89+
90+
message SketchPayload {
91+
message Sketch {
92+
message Distribution {
93+
int64 ts = 1;
94+
int64 cnt = 2;
95+
double min = 3;
96+
double max = 4;
97+
double avg = 5;
98+
double sum = 6;
99+
repeated double v = 7;
100+
repeated uint32 g = 8;
101+
repeated uint32 delta = 9;
102+
repeated double buf = 10;
103+
}
104+
message Dogsketch {
105+
int64 ts = 1;
106+
int64 cnt = 2;
107+
double min = 3;
108+
double max = 4;
109+
double avg = 5;
110+
double sum = 6;
111+
repeated sint32 k = 7;
112+
repeated uint32 n = 8;
113+
}
114+
string metric = 1;
115+
string host = 2;
116+
repeated Distribution distributions = 3;
117+
repeated string tags = 4;
118+
reserved 5, 6;
119+
reserved "distributionsK", "distributionsC";
120+
repeated Dogsketch dogsketches = 7;
121+
Metadata metadata = 8;
122+
}
123+
repeated Sketch sketches = 1;
124+
CommonMetadata metadata = 2;
125+
}

lading/src/bin/lading.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ async fn inner_main(
427427
capture_manager
428428
.start()
429429
.await
430-
.expect("failed to start capture manager");
430+
.expect("capture manager suffered unexpected failure");
431431
});
432432
capture_manager_handle = Some(handle);
433433
}

lading/src/blackhole.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use serde::{Deserialize, Serialize};
99

1010
mod common;
11+
pub mod datadog;
1112
pub mod http;
1213
pub mod otlp;
1314
pub mod splunk_hec;
@@ -23,6 +24,9 @@ pub enum Error {
2324
/// See [`crate::blackhole::tcp::Error`] for details.
2425
#[error(transparent)]
2526
Tcp(tcp::Error),
27+
/// See [`crate::blackhole::datadog::Error`] for details.
28+
#[error(transparent)]
29+
Datadog(datadog::Error),
2630
/// See [`crate::blackhole::http::Error`] for details.
2731
#[error(transparent)]
2832
Http(http::Error),
@@ -75,6 +79,8 @@ pub struct General {
7579
pub enum Inner {
7680
/// See [`crate::blackhole::tcp::Config`] for details.
7781
Tcp(tcp::Config),
82+
/// See [`crate::blackhole::datadog::Config`] for details.
83+
Datadog(datadog::Config),
7884
/// See [`crate::blackhole::http::Config`] for details.
7985
Http(http::Config),
8086
/// See [`crate::blackhole::splunk_hec::Config`] for details.
@@ -99,6 +105,8 @@ pub enum Inner {
99105
pub enum Server {
100106
/// See [`crate::blackhole::tcp::Tcp`] for details.
101107
Tcp(tcp::Tcp),
108+
/// See [`crate::blackhole::datadog::Datadog`] for details.
109+
Datadog(datadog::Datadog),
102110
/// See [`crate::blackhole::http::Http`] for details.
103111
Http(http::Http),
104112
/// See [`crate::blackhole::splunk_hec::SplunkHec`] for details.
@@ -128,6 +136,9 @@ impl Server {
128136
pub fn new(config: Config, shutdown: lading_signal::Watcher) -> Result<Self, Error> {
129137
let server = match config.inner {
130138
Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)),
139+
Inner::Datadog(conf) => {
140+
Self::Datadog(datadog::Datadog::new(config.general, conf, shutdown))
141+
}
131142
Inner::Http(conf) => {
132143
Self::Http(http::Http::new(config.general, &conf, shutdown).map_err(Error::Http)?)
133144
}
@@ -163,6 +174,7 @@ impl Server {
163174
pub async fn run(self) -> Result<(), Error> {
164175
match self {
165176
Server::Tcp(inner) => inner.run().await.map_err(Error::Tcp),
177+
Server::Datadog(inner) => inner.run().await.map_err(Error::Datadog),
166178
Server::Http(inner) => inner.run().await.map_err(Error::Http),
167179
Server::Udp(inner) => Box::pin(inner.run()).await.map_err(Error::Udp),
168180
Server::UnixStream(inner) => inner.run().await.map_err(Error::UnixStream),

0 commit comments

Comments
 (0)