Skip to content

Commit 6f96079

Browse files
merge from main
1 parent 926467c commit 6f96079

File tree

10 files changed

+142
-264
lines changed

10 files changed

+142
-264
lines changed

src/cli.rs

+55-59
Original file line numberDiff line numberDiff line change
@@ -441,81 +441,77 @@ impl Options {
441441

442442
/// TODO: refactor and document
443443
pub fn get_url(&self, mode: Mode) -> Url {
444-
let (endpoint, env_var) = match mode {
445-
Mode::Ingest => {
446-
if self.ingestor_endpoint.is_empty() {
447-
return format!(
448-
"{}://{}",
449-
self.get_scheme(),
450-
self.address
451-
)
452-
.parse::<Url>() // if the value was improperly set, this will panic before hand
453-
.unwrap_or_else(|err| {
454-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
455-
});
456-
}
457-
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
458-
}
459-
Mode::Index => {
460-
if self.indexer_endpoint.is_empty() {
461-
return format!(
462-
"{}://{}",
463-
self.get_scheme(),
464-
self.address
465-
)
466-
.parse::<Url>() // if the value was improperly set, this will panic before hand
467-
.unwrap_or_else(|err| {
468-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
469-
});
470-
}
471-
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
472-
}
473-
_ => panic!("Invalid mode"),
444+
let endpoint = match mode {
445+
Mode::Ingest => self.get_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"),
446+
Mode::Index => self.get_endpoint(&self.indexer_endpoint, "P_INDEXER_ENDPOINT"),
447+
Mode::Query | Mode::All => return self.build_url(&self.address),
474448
};
475449

476-
if endpoint.starts_with("http") {
477-
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
478-
}
450+
self.parse_endpoint(&endpoint)
451+
}
479452

480-
let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();
453+
fn get_endpoint(&self, endpoint: &str, env_var: &str) -> String {
454+
if endpoint.is_empty() {
455+
return self.address.clone();
456+
}
481457

482-
if addr_from_env.len() != 2 {
483-
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
458+
if endpoint.starts_with("http") {
459+
panic!(
460+
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
461+
endpoint, env_var
462+
);
484463
}
485464

486-
let mut hostname = addr_from_env[0].to_string();
487-
let mut port = addr_from_env[1].to_string();
465+
endpoint.to_string()
466+
}
488467

489-
// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
490-
// fetch the value from the specified env vars
491-
if hostname.starts_with('$') {
492-
let var_hostname = hostname[1..].to_string();
493-
hostname = env::var(&var_hostname).unwrap_or_default();
468+
fn parse_endpoint(&self, endpoint: &str) -> Url {
469+
let addr_parts: Vec<&str> = endpoint.split(':').collect();
494470

495-
if hostname.is_empty() {
496-
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
497-
}
498-
if hostname.starts_with("http") {
499-
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
500-
} else {
501-
hostname = format!("{}://{}", self.get_scheme(), hostname);
502-
}
471+
if addr_parts.len() != 2 {
472+
panic!(
473+
"Invalid value `{}`, please set the environment variable to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
474+
endpoint
475+
);
503476
}
504477

505-
if port.starts_with('$') {
506-
let var_port = port[1..].to_string();
507-
port = env::var(&var_port).unwrap_or_default();
478+
let hostname = self.resolve_env_var(addr_parts[0]);
479+
let port = self.resolve_env_var(addr_parts[1]);
508480

509-
if port.is_empty() {
481+
self.build_url(&format!("{}:{}", hostname, port))
482+
}
483+
484+
fn resolve_env_var(&self, value: &str) -> String {
485+
if let Some(stripped) = value.strip_prefix('$') {
486+
let var_name = stripped;
487+
let resolved_value = env::var(var_name).unwrap_or_else(|_| {
488+
panic!(
489+
"The environment variable `{}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details.",
490+
var_name
491+
);
492+
});
493+
494+
if resolved_value.starts_with("http") {
510495
panic!(
511-
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
512-
var_port
496+
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.",
497+
resolved_value, var_name
513498
);
514499
}
500+
501+
resolved_value
502+
} else {
503+
value.to_string()
515504
}
505+
}
516506

517-
format!("{}://{}:{}", self.get_scheme(), hostname, port)
507+
fn build_url(&self, address: &str) -> Url {
508+
format!("{}://{}", self.get_scheme(), address)
518509
.parse::<Url>()
519-
.expect("Valid URL")
510+
.unwrap_or_else(|err| {
511+
panic!(
512+
"{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.",
513+
address
514+
);
515+
})
520516
}
521517
}

src/event/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ impl Event {
7676
&self.rb,
7777
self.parsed_timestamp,
7878
&self.custom_partition_values,
79-
self.stream_type,
8079
)?;
8180

8281
update_stats(
@@ -100,7 +99,6 @@ impl Event {
10099
&self.rb,
101100
self.parsed_timestamp,
102101
&self.custom_partition_values,
103-
self.stream_type,
104102
)?;
105103

106104
Ok(())

src/handlers/http/cluster/mod.rs

+25-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::collections::HashSet;
2323
use std::time::Duration;
2424

2525
use actix_web::http::header::{self, HeaderMap};
26-
use actix_web::web::{Json, Path};
26+
use actix_web::web::Path;
2727
use actix_web::Responder;
2828
use bytes::Bytes;
2929
use chrono::Utc;
@@ -39,6 +39,7 @@ use url::Url;
3939
use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats};
4040

4141
use crate::handlers::http::ingest::ingest_internal_stream;
42+
use crate::metrics::collect_all_metrics;
4243
use crate::metrics::prom_utils::Metrics;
4344
use crate::parseable::PARSEABLE;
4445
use crate::rbac::role::model::DefaultPrivilege;
@@ -870,6 +871,7 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
870871
Err(err) => return Err(err),
871872
}
872873

874+
all_metrics.push(Metrics::querier_prometheus_metrics().await);
873875
Ok(all_metrics)
874876
}
875877

@@ -880,14 +882,30 @@ pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> {
880882
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
881883
.run(move || async {
882884
let result: Result<(), PostError> = async {
885+
if let Err(err) = collect_all_metrics().await {
886+
error!("Error in capturing system metrics: {:#}", err);
887+
}
883888
let cluster_metrics = fetch_cluster_metrics().await;
884889
if let Ok(metrics) = cluster_metrics {
885-
let json_value = serde_json::to_value(metrics)
886-
.map_err(|e| anyhow::anyhow!("Failed to serialize metrics: {}", e))?;
887-
888-
ingest_internal_stream(INTERNAL_STREAM_NAME.to_string(), Json(json_value))
889-
.await
890-
.map_err(|e| anyhow::anyhow!("Failed to ingest metrics: {}", e))?;
890+
if !metrics.is_empty() {
891+
info!("Cluster metrics fetched successfully from all ingestors");
892+
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
893+
if matches!(
894+
ingest_internal_stream(
895+
INTERNAL_STREAM_NAME.to_string(),
896+
bytes::Bytes::from(metrics_bytes),
897+
)
898+
.await,
899+
Ok(())
900+
) {
901+
info!("Cluster metrics successfully ingested into internal stream");
902+
} else {
903+
error!("Failed to ingest cluster metrics into internal stream");
904+
}
905+
} else {
906+
error!("Failed to serialize cluster metrics");
907+
}
908+
}
891909
}
892910
Ok(())
893911
}

src/handlers/http/ingest.rs

+3-36
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,19 @@
1818

1919
use std::collections::{HashMap, HashSet};
2020

21-
use super::logstream::error::{CreateStreamError, StreamError};
22-
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
23-
use super::users::dashboards::DashboardError;
24-
use super::users::filters::FiltersError;
25-
use crate::event::format::{self, EventFormat, LogSource};
26-
use crate::event::{self, error::EventError};
27-
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
28-
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
29-
use crate::metadata::error::stream_info::MetadataError;
30-
use crate::metadata::{SchemaVersion, STREAM_INFO};
31-
use crate::option::{Mode, CONFIG};
32-
use crate::otel::logs::flatten_otel_logs;
33-
use crate::otel::metrics::flatten_otel_metrics;
34-
use crate::otel::traces::flatten_otel_traces;
35-
use crate::storage::{ObjectStorageError, StreamType};
36-
use crate::utils::header_parsing::ParseHeaderError;
37-
use crate::utils::json::convert_array_to_object;
38-
use crate::utils::json::flatten::{convert_to_array, JsonFlattenError};
3921
use actix_web::web::{Json, Path};
4022
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
4123
use arrow_array::RecordBatch;
24+
use bytes::Bytes;
4225
use chrono::Utc;
4326
use http::StatusCode;
4427
use serde_json::Value;
4528

4629
use crate::event::error::EventError;
4730
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
48-
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
31+
use crate::event::format::{LogSource, LogSourceEntry};
4932
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
5033
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
51-
use crate::metadata::SchemaVersion;
5234
use crate::option::Mode;
5335
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
5436
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
@@ -138,26 +120,11 @@ pub async fn ingest(
138120
}
139121

140122
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
141-
let size: usize = body.len();
142123
let json: Value = serde_json::from_slice(&body)?;
143-
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();
144124
let mut p_custom_fields = HashMap::new();
145125
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
146126
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
147-
// For internal streams, use old schema
148-
format::json::Event::new(json)
149-
.into_event(
150-
stream_name,
151-
size as u64,
152-
&schema,
153-
false,
154-
None,
155-
None,
156-
SchemaVersion::V0,
157-
StreamType::Internal,
158-
&p_custom_fields,
159-
)?
160-
.process()?;
127+
flatten_and_push_logs(json, &stream_name, &LogSource::Pmeta, &p_custom_fields).await?;
161128

162129
Ok(())
163130
}

src/handlers/http/modal/ingest_server.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use relative_path::RelativePathBuf;
2828
use serde_json::Value;
2929
use tokio::sync::oneshot;
3030

31+
use crate::metrics::init_system_metrics_scheduler;
3132
use crate::option::Mode;
3233
use crate::{
3334
analytics,
@@ -110,7 +111,7 @@ impl ParseableServer for IngestServer {
110111

111112
// write the ingestor metadata to storage
112113
PARSEABLE.store_metadata(Mode::Ingest).await?;
113-
114+
init_system_metrics_scheduler().await?;
114115
// Ingestors shouldn't have to deal with OpenId auth flow
115116
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
116117
// Cancel sync jobs

src/handlers/http/modal/query_server.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
use std::thread;
2020

2121
use crate::handlers::airplane;
22-
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
22+
use crate::handlers::http::cluster::{self, init_cluster_metrics_scheduler};
2323
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
2424
use crate::handlers::http::{base_path, prism_base_path};
2525
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2626
use crate::handlers::http::{rbac, role};
2727
use crate::hottier::HotTierManager;
28+
use crate::metrics::init_system_metrics_scheduler;
2829
use crate::rbac::role::Action;
2930
use crate::{analytics, migration, storage, sync};
3031
use actix_web::web::{resource, ServiceConfig};
@@ -33,7 +34,6 @@ use actix_web_prometheus::PrometheusMetrics;
3334
use async_trait::async_trait;
3435
use bytes::Bytes;
3536
use tokio::sync::oneshot;
36-
use tracing::info;
3737

3838
use crate::parseable::PARSEABLE;
3939
use crate::Server;
@@ -109,17 +109,15 @@ impl ParseableServer for QueryServer {
109109
// track all parquet files already in the data directory
110110
storage::retention::load_retention_from_global();
111111

112-
metrics::init_system_metrics_scheduler().await?;
113-
cluster::init_cluster_metrics_scheduler().await?;
114112
// all internal data structures populated now.
115113
// start the analytics scheduler if enabled
116114
if PARSEABLE.options.send_analytics {
117115
analytics::init_analytics_scheduler()?;
118116
}
119117

120-
if init_cluster_metrics_schedular().is_ok() {
121-
info!("Cluster metrics scheduler started successfully");
122-
}
118+
init_system_metrics_scheduler().await?;
119+
init_cluster_metrics_scheduler().await?;
120+
123121
if let Some(hot_tier_manager) = HotTierManager::global() {
124122
hot_tier_manager.put_internal_stream_hot_tier().await?;
125123
hot_tier_manager.download_from_s3()?;

src/handlers/http/modal/server.rs

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::handlers::http::users::dashboards;
3030
use crate::handlers::http::users::filters;
3131
use crate::hottier::HotTierManager;
3232
use crate::metrics;
33+
use crate::metrics::init_system_metrics_scheduler;
3334
use crate::migration;
3435
use crate::storage;
3536
use crate::sync;
@@ -134,6 +135,8 @@ impl ParseableServer for Server {
134135
analytics::init_analytics_scheduler()?;
135136
}
136137

138+
init_system_metrics_scheduler().await?;
139+
137140
tokio::spawn(handlers::livetail::server());
138141
tokio::spawn(handlers::airplane::server());
139142

src/metrics/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::{path::Path, time::Duration};
2626
use sysinfo::{Disks, System};
2727
use tracing::{error, info};
2828

29-
use crate::{handlers::http::metrics_path, option::CONFIG, stats::FullStats};
29+
use crate::{handlers::http::metrics_path, parseable::PARSEABLE, stats::FullStats};
3030
use actix_web::Responder;
3131
use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
3232
use error::MetricsError;
@@ -419,14 +419,14 @@ pub async fn collect_all_metrics() -> Result<(), MetricsError> {
419419
// Function to collect disk usage metrics
420420
async fn collect_disk_metrics() -> Result<(), MetricsError> {
421421
// collect staging volume metrics
422-
collect_volume_disk_usage("staging", CONFIG.staging_dir())?;
422+
collect_volume_disk_usage("staging", PARSEABLE.options.staging_dir())?;
423423
// Collect data volume metrics for local storage
424-
if CONFIG.get_storage_mode_string() == "Local drive" {
425-
collect_volume_disk_usage("data", Path::new(&CONFIG.storage().get_endpoint()))?;
424+
if PARSEABLE.get_storage_mode_string() == "Local drive" {
425+
collect_volume_disk_usage("data", Path::new(&PARSEABLE.storage().get_endpoint()))?;
426426
}
427427

428428
// Collect hot tier volume metrics if configured
429-
if let Some(hot_tier_dir) = CONFIG.hot_tier_dir() {
429+
if let Some(hot_tier_dir) = PARSEABLE.hot_tier_dir() {
430430
collect_volume_disk_usage("hot_tier", hot_tier_dir)?;
431431
}
432432

0 commit comments

Comments
 (0)