Skip to content

Commit f7c61a5

Browse files
feat: add sysinfo metrics
collect CPU usage, memory usage of the server collect disk usage of the volume - data, staging, hot-tier add these metrics to Prometheus Metrics export these metrics to cluster metrics API add the metrics to pmeta stream add the querier node's sysinfo metrics to pmeta and cluster metrics API
1 parent 06293cc commit f7c61a5

File tree

5 files changed

+521
-28
lines changed

5 files changed

+521
-28
lines changed

src/handlers/http/cluster/mod.rs

+8-22
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525
use std::time::Duration;
2626

2727
use actix_web::http::header::{self, HeaderMap};
28-
use actix_web::web::Path;
28+
use actix_web::web::{Json, Path};
2929
use actix_web::Responder;
3030
use bytes::Bytes;
3131
use chrono::Utc;
@@ -869,7 +869,6 @@ where
869869
let text = res.text().await.map_err(PostError::NetworkError)?;
870870
let lines: Vec<Result<String, std::io::Error>> =
871871
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
872-
873872
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
874873
.map_err(|err| PostError::CustomError(err.to_string()))?
875874
.samples;
@@ -995,7 +994,7 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
995994
Ok(all_metrics)
996995
}
997996

998-
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
997+
pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> {
999998
info!("Setting up schedular for cluster metrics ingestion");
1000999
let mut scheduler = AsyncScheduler::new();
10011000
scheduler
@@ -1004,25 +1003,12 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
10041003
let result: Result<(), PostError> = async {
10051004
let cluster_metrics = fetch_cluster_metrics().await;
10061005
if let Ok(metrics) = cluster_metrics {
1007-
if !metrics.is_empty() {
1008-
info!("Cluster metrics fetched successfully from all ingestors");
1009-
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
1010-
if matches!(
1011-
ingest_internal_stream(
1012-
INTERNAL_STREAM_NAME.to_string(),
1013-
bytes::Bytes::from(metrics_bytes),
1014-
)
1015-
.await,
1016-
Ok(())
1017-
) {
1018-
info!("Cluster metrics successfully ingested into internal stream");
1019-
} else {
1020-
error!("Failed to ingest cluster metrics into internal stream");
1021-
}
1022-
} else {
1023-
error!("Failed to serialize cluster metrics");
1024-
}
1025-
}
1006+
let json_value = serde_json::to_value(metrics)
1007+
.map_err(|e| anyhow::anyhow!("Failed to serialize metrics: {}", e))?;
1008+
1009+
ingest_internal_stream(INTERNAL_STREAM_NAME.to_string(), Json(json_value))
1010+
.await
1011+
.map_err(|e| anyhow::anyhow!("Failed to ingest metrics: {}", e))?;
10261012
}
10271013
Ok(())
10281014
}

src/handlers/http/ingest.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,27 @@
1818

1919
use std::collections::{HashMap, HashSet};
2020

21+
use super::logstream::error::{CreateStreamError, StreamError};
22+
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
23+
use super::users::dashboards::DashboardError;
24+
use super::users::filters::FiltersError;
25+
use crate::event::format::{self, EventFormat, LogSource};
26+
use crate::event::{self, error::EventError};
27+
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
28+
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
29+
use crate::metadata::error::stream_info::MetadataError;
30+
use crate::metadata::{SchemaVersion, STREAM_INFO};
31+
use crate::option::{Mode, CONFIG};
32+
use crate::otel::logs::flatten_otel_logs;
33+
use crate::otel::metrics::flatten_otel_metrics;
34+
use crate::otel::traces::flatten_otel_traces;
35+
use crate::storage::{ObjectStorageError, StreamType};
36+
use crate::utils::header_parsing::ParseHeaderError;
37+
use crate::utils::json::convert_array_to_object;
38+
use crate::utils::json::flatten::{convert_to_array, JsonFlattenError};
2139
use actix_web::web::{Json, Path};
2240
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
2341
use arrow_array::RecordBatch;
24-
use bytes::Bytes;
2542
use chrono::Utc;
2643
use http::StatusCode;
2744
use serde_json::Value;

src/handlers/http/modal/query_server.rs

+2
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ impl ParseableServer for QueryServer {
117117
// track all parquet files already in the data directory
118118
storage::retention::load_retention_from_global();
119119

120+
metrics::init_system_metrics_scheduler().await?;
121+
cluster::init_cluster_metrics_scheduler().await?;
120122
// all internal data structures populated now.
121123
// start the analytics scheduler if enabled
122124
if PARSEABLE.options.send_analytics {

src/metrics/mod.rs

+250-5
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,25 @@
1818

1919
pub mod prom_utils;
2020
pub mod storage;
21-
22-
use crate::{handlers::http::metrics_path, stats::FullStats};
21+
use actix_web::HttpResponse;
22+
use clokwerk::{AsyncScheduler, Interval};
23+
use http::StatusCode;
24+
use serde::Serialize;
25+
use std::{path::Path, time::Duration};
26+
use sysinfo::{Disks, System};
27+
use tracing::{error, info};
28+
29+
use crate::{handlers::http::metrics_path, option::CONFIG, stats::FullStats};
2330
use actix_web::Responder;
2431
use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
2532
use error::MetricsError;
2633
use once_cell::sync::Lazy;
27-
use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry};
34+
use prometheus::{
35+
GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry,
36+
};
2837

2938
pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");
39+
const SYSTEM_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
3040

3141
pub static EVENTS_INGESTED: Lazy<IntGaugeVec> = Lazy::new(|| {
3242
IntGaugeVec::new(
@@ -182,6 +192,42 @@ pub static ALERTS_STATES: Lazy<IntCounterVec> = Lazy::new(|| {
182192
.expect("metric can be created")
183193
});
184194

195+
pub static TOTAL_DISK: Lazy<IntGaugeVec> = Lazy::new(|| {
196+
IntGaugeVec::new(
197+
Opts::new("total_disk", "Total Disk Size").namespace(METRICS_NAMESPACE),
198+
&["volume"],
199+
)
200+
.expect("metric can be created")
201+
});
202+
pub static USED_DISK: Lazy<IntGaugeVec> = Lazy::new(|| {
203+
IntGaugeVec::new(
204+
Opts::new("used_disk", "Used Disk Size").namespace(METRICS_NAMESPACE),
205+
&["volume"],
206+
)
207+
.expect("metric can be created")
208+
});
209+
pub static AVAILABLE_DISK: Lazy<IntGaugeVec> = Lazy::new(|| {
210+
IntGaugeVec::new(
211+
Opts::new("available_disk", "Available Disk Size").namespace(METRICS_NAMESPACE),
212+
&["volume"],
213+
)
214+
.expect("metric can be created")
215+
});
216+
pub static MEMORY: Lazy<IntGaugeVec> = Lazy::new(|| {
217+
IntGaugeVec::new(
218+
Opts::new("memory_usage", "Memory Usage").namespace(METRICS_NAMESPACE),
219+
&["memory_usage"],
220+
)
221+
.expect("metric can be created")
222+
});
223+
pub static CPU: Lazy<GaugeVec> = Lazy::new(|| {
224+
GaugeVec::new(
225+
Opts::new("cpu_usage", "CPU Usage").namespace(METRICS_NAMESPACE),
226+
&["cpu_usage"],
227+
)
228+
.expect("metric can be created")
229+
});
230+
185231
fn custom_metrics(registry: &Registry) {
186232
registry
187233
.register(Box::new(EVENTS_INGESTED.clone()))
@@ -231,6 +277,21 @@ fn custom_metrics(registry: &Registry) {
231277
registry
232278
.register(Box::new(ALERTS_STATES.clone()))
233279
.expect("metric can be registered");
280+
registry
281+
.register(Box::new(TOTAL_DISK.clone()))
282+
.expect("metric can be registered");
283+
registry
284+
.register(Box::new(USED_DISK.clone()))
285+
.expect("metric can be registered");
286+
registry
287+
.register(Box::new(AVAILABLE_DISK.clone()))
288+
.expect("metric can be registered");
289+
registry
290+
.register(Box::new(MEMORY.clone()))
291+
.expect("metric can be registered");
292+
registry
293+
.register(Box::new(CPU.clone()))
294+
.expect("metric can be registered");
234295
}
235296

236297
pub fn build_metrics_handler() -> PrometheusMetrics {
@@ -290,12 +351,196 @@ pub async fn fetch_stats_from_storage(stream_name: &str, stats: FullStats) {
290351
.set(stats.lifetime_stats.storage as i64);
291352
}
292353

293-
use actix_web::HttpResponse;
294-
295354
pub async fn get() -> Result<impl Responder, MetricsError> {
296355
Ok(HttpResponse::Ok().body(format!("{:?}", build_metrics_handler())))
297356
}
298357

358+
#[derive(Debug, Serialize, Default, Clone)]
359+
pub struct DiskMetrics {
360+
total: u64,
361+
used: u64,
362+
available: u64,
363+
}
364+
365+
#[derive(Debug, Serialize, Default, Clone)]
366+
pub struct SystemMetrics {
367+
memory: MemoryMetrics,
368+
cpu: Vec<CpuMetrics>,
369+
}
370+
371+
#[derive(Debug, Serialize, Default, Clone)]
372+
pub struct MemoryMetrics {
373+
total: u64,
374+
used: u64,
375+
total_swap: u64,
376+
used_swap: u64,
377+
}
378+
379+
#[derive(Debug, Serialize, Default, Clone)]
380+
pub struct CpuMetrics {
381+
name: String,
382+
usage: f64,
383+
}
384+
385+
// Scheduler for collecting all system metrics
386+
pub async fn init_system_metrics_scheduler() -> Result<(), MetricsError> {
387+
info!("Setting up scheduler for capturing system metrics");
388+
let mut scheduler = AsyncScheduler::new();
389+
390+
scheduler
391+
.every(SYSTEM_METRICS_INTERVAL_SECONDS)
392+
.run(move || async {
393+
if let Err(err) = collect_all_metrics().await {
394+
error!("Error in capturing system metrics: {:#}", err);
395+
}
396+
});
397+
398+
tokio::spawn(async move {
399+
loop {
400+
scheduler.run_pending().await;
401+
tokio::time::sleep(Duration::from_secs(10)).await;
402+
}
403+
});
404+
405+
Ok(())
406+
}
407+
408+
// Function to collect memory, CPU and disk usage metrics
409+
pub async fn collect_all_metrics() -> Result<(), MetricsError> {
410+
// Collect system metrics (CPU and memory)
411+
collect_system_metrics().await?;
412+
413+
// Collect disk metrics for all volumes
414+
collect_disk_metrics().await?;
415+
416+
Ok(())
417+
}
418+
419+
// Function to collect disk usage metrics
420+
async fn collect_disk_metrics() -> Result<(), MetricsError> {
421+
// collect staging volume metrics
422+
collect_volume_disk_usage("staging", CONFIG.staging_dir())?;
423+
// Collect data volume metrics for local storage
424+
if CONFIG.get_storage_mode_string() == "Local drive" {
425+
collect_volume_disk_usage("data", Path::new(&CONFIG.storage().get_endpoint()))?;
426+
}
427+
428+
// Collect hot tier volume metrics if configured
429+
if let Some(hot_tier_dir) = CONFIG.hot_tier_dir() {
430+
collect_volume_disk_usage("hot_tier", hot_tier_dir)?;
431+
}
432+
433+
Ok(())
434+
}
435+
436+
// Function to collect disk usage metrics for a specific volume
437+
fn collect_volume_disk_usage(label: &str, path: &Path) -> Result<(), MetricsError> {
438+
let metrics = get_volume_disk_usage(path)?;
439+
440+
TOTAL_DISK
441+
.with_label_values(&[label])
442+
.set(metrics.total as i64);
443+
USED_DISK
444+
.with_label_values(&[label])
445+
.set(metrics.used as i64);
446+
AVAILABLE_DISK
447+
.with_label_values(&[label])
448+
.set(metrics.available as i64);
449+
450+
Ok(())
451+
}
452+
453+
// Function to get disk usage for a specific volume
454+
fn get_volume_disk_usage(path: &Path) -> Result<DiskMetrics, MetricsError> {
455+
let mut disks = Disks::new_with_refreshed_list();
456+
disks.sort_by(|a, b| {
457+
b.mount_point()
458+
.to_str()
459+
.unwrap_or("")
460+
.len()
461+
.cmp(&a.mount_point().to_str().unwrap_or("").len())
462+
});
463+
464+
for disk in disks.iter() {
465+
let mount_point = disk.mount_point().to_str().unwrap();
466+
467+
if path.starts_with(mount_point) {
468+
return Ok(DiskMetrics {
469+
total: disk.total_space(),
470+
used: disk.total_space() - disk.available_space(),
471+
available: disk.available_space(),
472+
});
473+
}
474+
}
475+
476+
Err(MetricsError::Custom(
477+
format!("No matching disk found for path: {:?}", path),
478+
StatusCode::INTERNAL_SERVER_ERROR,
479+
))
480+
}
481+
482+
// Function to collect CPU and memory usage metrics
483+
async fn collect_system_metrics() -> Result<(), MetricsError> {
484+
let metrics = get_system_metrics()?;
485+
486+
// Set memory metrics
487+
MEMORY
488+
.with_label_values(&["total_memory"])
489+
.set(metrics.memory.total as i64);
490+
MEMORY
491+
.with_label_values(&["used_memory"])
492+
.set(metrics.memory.used as i64);
493+
MEMORY
494+
.with_label_values(&["total_swap"])
495+
.set(metrics.memory.total_swap as i64);
496+
MEMORY
497+
.with_label_values(&["used_swap"])
498+
.set(metrics.memory.used_swap as i64);
499+
500+
// Set CPU metrics
501+
for cpu in metrics.cpu {
502+
CPU.with_label_values(&[&cpu.name]).set(cpu.usage);
503+
}
504+
505+
Ok(())
506+
}
507+
508+
// Get system metrics
509+
fn get_system_metrics() -> Result<SystemMetrics, MetricsError> {
510+
let mut sys = System::new_all();
511+
sys.refresh_all();
512+
513+
// Collect memory metrics
514+
let memory = MemoryMetrics {
515+
total: sys.total_memory(),
516+
used: sys.used_memory(),
517+
total_swap: sys.total_swap(),
518+
used_swap: sys.used_swap(),
519+
};
520+
521+
// Collect CPU metrics
522+
let mut cpu_metrics = Vec::new();
523+
524+
// Add global CPU usage
525+
cpu_metrics.push(CpuMetrics {
526+
name: "global".to_string(),
527+
usage: sys.global_cpu_usage() as f64,
528+
});
529+
530+
// Add individual CPU usage
531+
for cpu in sys.cpus() {
532+
cpu_metrics.push(CpuMetrics {
533+
name: cpu.name().to_string(),
534+
usage: cpu.cpu_usage() as f64,
535+
});
536+
}
537+
538+
Ok(SystemMetrics {
539+
memory,
540+
cpu: cpu_metrics,
541+
})
542+
}
543+
299544
pub mod error {
300545

301546
use actix_web::http::header::ContentType;

0 commit comments

Comments
 (0)