Skip to content

Commit 0b2d1a7

Browse files
merge from main
1 parent dde2e49 commit 0b2d1a7

File tree

9 files changed

+87
-205
lines changed

9 files changed

+87
-205
lines changed

src/event/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ impl Event {
7676
&self.rb,
7777
self.parsed_timestamp,
7878
&self.custom_partition_values,
79-
self.stream_type,
8079
)?;
8180

8281
update_stats(
@@ -100,7 +99,6 @@ impl Event {
10099
&self.rb,
101100
self.parsed_timestamp,
102101
&self.custom_partition_values,
103-
self.stream_type,
104102
)?;
105103

106104
Ok(())

src/handlers/http/cluster/mod.rs

+25-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525
use std::time::Duration;
2626

2727
use actix_web::http::header::{self, HeaderMap};
28-
use actix_web::web::{Json, Path};
28+
use actix_web::web::Path;
2929
use actix_web::Responder;
3030
use bytes::Bytes;
3131
use chrono::Utc;
@@ -41,6 +41,7 @@ use url::Url;
4141
use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats};
4242

4343
use crate::handlers::http::ingest::ingest_internal_stream;
44+
use crate::metrics::collect_all_metrics;
4445
use crate::metrics::prom_utils::Metrics;
4546
use crate::parseable::PARSEABLE;
4647
use crate::rbac::role::model::DefaultPrivilege;
@@ -991,6 +992,7 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
991992
Err(err) => return Err(err),
992993
}
993994

995+
all_metrics.push(Metrics::querier_prometheus_metrics().await);
994996
Ok(all_metrics)
995997
}
996998

@@ -1001,14 +1003,30 @@ pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> {
10011003
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
10021004
.run(move || async {
10031005
let result: Result<(), PostError> = async {
1006+
if let Err(err) = collect_all_metrics().await {
1007+
error!("Error in capturing system metrics: {:#}", err);
1008+
}
10041009
let cluster_metrics = fetch_cluster_metrics().await;
10051010
if let Ok(metrics) = cluster_metrics {
1006-
let json_value = serde_json::to_value(metrics)
1007-
.map_err(|e| anyhow::anyhow!("Failed to serialize metrics: {}", e))?;
1008-
1009-
ingest_internal_stream(INTERNAL_STREAM_NAME.to_string(), Json(json_value))
1010-
.await
1011-
.map_err(|e| anyhow::anyhow!("Failed to ingest metrics: {}", e))?;
1011+
if !metrics.is_empty() {
1012+
info!("Cluster metrics fetched successfully from all ingestors");
1013+
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
1014+
if matches!(
1015+
ingest_internal_stream(
1016+
INTERNAL_STREAM_NAME.to_string(),
1017+
bytes::Bytes::from(metrics_bytes),
1018+
)
1019+
.await,
1020+
Ok(())
1021+
) {
1022+
info!("Cluster metrics successfully ingested into internal stream");
1023+
} else {
1024+
error!("Failed to ingest cluster metrics into internal stream");
1025+
}
1026+
} else {
1027+
error!("Failed to serialize cluster metrics");
1028+
}
1029+
}
10121030
}
10131031
Ok(())
10141032
}

src/handlers/http/ingest.rs

+3-36
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,19 @@
1818

1919
use std::collections::{HashMap, HashSet};
2020

21-
use super::logstream::error::{CreateStreamError, StreamError};
22-
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
23-
use super::users::dashboards::DashboardError;
24-
use super::users::filters::FiltersError;
25-
use crate::event::format::{self, EventFormat, LogSource};
26-
use crate::event::{self, error::EventError};
27-
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
28-
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
29-
use crate::metadata::error::stream_info::MetadataError;
30-
use crate::metadata::{SchemaVersion, STREAM_INFO};
31-
use crate::option::{Mode, CONFIG};
32-
use crate::otel::logs::flatten_otel_logs;
33-
use crate::otel::metrics::flatten_otel_metrics;
34-
use crate::otel::traces::flatten_otel_traces;
35-
use crate::storage::{ObjectStorageError, StreamType};
36-
use crate::utils::header_parsing::ParseHeaderError;
37-
use crate::utils::json::convert_array_to_object;
38-
use crate::utils::json::flatten::{convert_to_array, JsonFlattenError};
3921
use actix_web::web::{Json, Path};
4022
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
4123
use arrow_array::RecordBatch;
24+
use bytes::Bytes;
4225
use chrono::Utc;
4326
use http::StatusCode;
4427
use serde_json::Value;
4528

4629
use crate::event::error::EventError;
4730
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
48-
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
31+
use crate::event::format::{LogSource, LogSourceEntry};
4932
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
5033
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
51-
use crate::metadata::SchemaVersion;
5234
use crate::option::Mode;
5335
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
5436
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
@@ -138,26 +120,11 @@ pub async fn ingest(
138120
}
139121

140122
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
141-
let size: usize = body.len();
142123
let json: Value = serde_json::from_slice(&body)?;
143-
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();
144124
let mut p_custom_fields = HashMap::new();
145125
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
146126
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
147-
// For internal streams, use old schema
148-
format::json::Event::new(json)
149-
.into_event(
150-
stream_name,
151-
size as u64,
152-
&schema,
153-
false,
154-
None,
155-
None,
156-
SchemaVersion::V0,
157-
StreamType::Internal,
158-
&p_custom_fields,
159-
)?
160-
.process()?;
127+
flatten_and_push_logs(json, &stream_name, &LogSource::Pmeta, &p_custom_fields).await?;
161128

162129
Ok(())
163130
}

src/handlers/http/modal/ingest_server.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use serde_json::Value;
3030
use tokio::sync::oneshot;
3131
use tokio::sync::OnceCell;
3232

33+
use crate::metrics::init_system_metrics_scheduler;
3334
use crate::handlers::http::modal::NodeType;
3435
use crate::{
3536
analytics,
@@ -119,7 +120,7 @@ impl ParseableServer for IngestServer {
119120
thread::spawn(|| sync::handler(cancel_rx));
120121

121122
tokio::spawn(airplane::server());
122-
123+
init_system_metrics_scheduler().await?;
123124
// Ingestors shouldn't have to deal with OpenId auth flow
124125
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
125126
// Cancel sync jobs

src/handlers/http/modal/query_server.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ use std::sync::Arc;
2020
use std::thread;
2121

2222
use crate::handlers::airplane;
23-
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
23+
use crate::handlers::http::cluster::{self, init_cluster_metrics_scheduler};
2424
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
2525
use crate::handlers::http::{base_path, prism_base_path};
2626
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2727
use crate::handlers::http::{rbac, role};
2828
use crate::hottier::HotTierManager;
29+
use crate::metrics::init_system_metrics_scheduler;
2930
use crate::rbac::role::Action;
3031
use crate::{analytics, migration, storage, sync};
3132
use actix_web::web::{resource, ServiceConfig};
@@ -34,7 +35,6 @@ use actix_web_prometheus::PrometheusMetrics;
3435
use async_trait::async_trait;
3536
use bytes::Bytes;
3637
use tokio::sync::{oneshot, OnceCell};
37-
use tracing::info;
3838

3939
use crate::parseable::PARSEABLE;
4040
use crate::Server;
@@ -117,17 +117,15 @@ impl ParseableServer for QueryServer {
117117
// track all parquet files already in the data directory
118118
storage::retention::load_retention_from_global();
119119

120-
metrics::init_system_metrics_scheduler().await?;
121-
cluster::init_cluster_metrics_scheduler().await?;
122120
// all internal data structures populated now.
123121
// start the analytics scheduler if enabled
124122
if PARSEABLE.options.send_analytics {
125123
analytics::init_analytics_scheduler()?;
126124
}
127125

128-
if init_cluster_metrics_schedular().is_ok() {
129-
info!("Cluster metrics scheduler started successfully");
130-
}
126+
init_system_metrics_scheduler().await?;
127+
init_cluster_metrics_scheduler().await?;
128+
131129
if let Some(hot_tier_manager) = HotTierManager::global() {
132130
hot_tier_manager.put_internal_stream_hot_tier().await?;
133131
hot_tier_manager.download_from_s3()?;

src/handlers/http/modal/server.rs

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::handlers::http::users::dashboards;
3030
use crate::handlers::http::users::filters;
3131
use crate::hottier::HotTierManager;
3232
use crate::metrics;
33+
use crate::metrics::init_system_metrics_scheduler;
3334
use crate::migration;
3435
use crate::storage;
3536
use crate::sync;
@@ -134,6 +135,8 @@ impl ParseableServer for Server {
134135
analytics::init_analytics_scheduler()?;
135136
}
136137

138+
init_system_metrics_scheduler().await?;
139+
137140
tokio::spawn(handlers::livetail::server());
138141
tokio::spawn(handlers::airplane::server());
139142

src/metrics/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::{path::Path, time::Duration};
2626
use sysinfo::{Disks, System};
2727
use tracing::{error, info};
2828

29-
use crate::{handlers::http::metrics_path, option::CONFIG, stats::FullStats};
29+
use crate::{handlers::http::metrics_path, parseable::PARSEABLE, stats::FullStats};
3030
use actix_web::Responder;
3131
use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
3232
use error::MetricsError;
@@ -419,14 +419,14 @@ pub async fn collect_all_metrics() -> Result<(), MetricsError> {
419419
// Function to collect disk usage metrics
420420
async fn collect_disk_metrics() -> Result<(), MetricsError> {
421421
// collect staging volume metrics
422-
collect_volume_disk_usage("staging", CONFIG.staging_dir())?;
422+
collect_volume_disk_usage("staging", PARSEABLE.options.staging_dir())?;
423423
// Collect data volume metrics for local storage
424-
if CONFIG.get_storage_mode_string() == "Local drive" {
425-
collect_volume_disk_usage("data", Path::new(&CONFIG.storage().get_endpoint()))?;
424+
if PARSEABLE.get_storage_mode_string() == "Local drive" {
425+
collect_volume_disk_usage("data", Path::new(&PARSEABLE.storage().get_endpoint()))?;
426426
}
427427

428428
// Collect hot tier volume metrics if configured
429-
if let Some(hot_tier_dir) = CONFIG.hot_tier_dir() {
429+
if let Some(hot_tier_dir) = PARSEABLE.hot_tier_dir() {
430430
collect_volume_disk_usage("hot_tier", hot_tier_dir)?;
431431
}
432432

0 commit comments

Comments
 (0)