diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs new file mode 100644 index 000000000..c29d61573 --- /dev/null +++ b/src/alerts/alerts_utils.rs @@ -0,0 +1,406 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use arrow_array::{Float64Array, Int64Array, RecordBatch}; +use datafusion::{ + common::tree_node::TreeNode, + functions_aggregate::{ + count::count, + expr_fn::avg, + min_max::{max, min}, + sum::sum, + }, + prelude::{col, lit, DataFrame, Expr}, +}; +use tracing::trace; + +use crate::{ + alerts::AggregateCondition, + query::{TableScanVisitor, QUERY_SESSION}, + rbac::{ + map::SessionKey, + role::{Action, Permission}, + Users, + }, + utils::time::TimeRange, +}; + +use super::{ + AggregateConfig, AggregateOperation, AggregateResult, Aggregations, AlertConfig, AlertError, + AlertOperator, AlertState, ConditionConfig, Conditions, ALERTS, +}; + +async fn get_tables_from_query(query: &str) -> Result { + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(query).await?; + + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + Ok(visitor) +} + +pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Result<(), AlertError> { + let tables = get_tables_from_query(query).await?; + let permissions = Users.get_permissions(session_key); + + for table_name in tables.into_inner().iter() { + let mut authorized = false; + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions.iter() { + match permission { + Permission::Stream(Action::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(Action::Query, ref stream, _) + if stream == table_name || stream == "*" => + { + authorized = true; + } + _ => (), + } + } + + if !authorized { + return Err(AlertError::Unauthorized); + } + } + + Ok(()) +} + +/// accept the alert +/// +/// alert contains aggregate_config +/// +/// aggregate_config contains the filters which need to be applied +/// +/// iterate over each agg config, apply filters, the evaluate for that config +/// +/// collect the results in the end +/// +/// check whether notification needs to be triggered or not +pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { + trace!("RUNNING EVAL TASK FOR- {alert:?}"); + + let query = prepare_query(alert).await?; + let base_df = execute_base_query(&query, &alert.query).await?; + let agg_results = evaluate_aggregates(&alert.aggregate_config, &base_df).await?; + let final_res = calculate_final_result(&alert.aggregate_config, &agg_results); + + update_alert_state(alert, final_res, &agg_results).await?; + Ok(()) +} + +async fn prepare_query(alert: &AlertConfig) -> Result { + let (start_time, end_time) = match &alert.eval_type { + super::EvalConfig::RollingWindow(rolling_window) => { + (&rolling_window.eval_start, &rolling_window.eval_end) + } + }; + + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?; + + let time_range = TimeRange::parse_human_time(start_time, end_time) + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + Ok(crate::query::Query { + raw_logical_plan, + time_range, + filter_tag: None, + }) +} + +async fn execute_base_query( + query: &crate::query::Query, + original_query: &str, +) -> Result { + let stream_name = query.first_table_name().ok_or_else(|| { + AlertError::CustomError(format!("Table name not found in query- {}", original_query)) + })?; + + query + .get_dataframe(stream_name) + .await + .map_err(|err| AlertError::CustomError(err.to_string())) +} + +async fn evaluate_aggregates( + agg_config: &Aggregations, + base_df: &DataFrame, +) -> Result, AlertError> { + let agg_filter_exprs = get_exprs(agg_config); + let mut results = Vec::new(); + + let conditions = match &agg_config.operator { + Some(_) => &agg_config.aggregate_conditions[0..2], + None => &agg_config.aggregate_conditions[0..1], + }; + + for ((agg_expr, filter), agg) in agg_filter_exprs.into_iter().zip(conditions) { + let result = evaluate_single_aggregate(base_df, filter, agg_expr, agg).await?; + results.push(result); + } + + Ok(results) +} + +async fn evaluate_single_aggregate( + base_df: &DataFrame, + filter: Option, + agg_expr: Expr, + agg: &AggregateConfig, +) -> Result { + let filtered_df = if let Some(filter) = filter { + base_df.clone().filter(filter)? + } else { + base_df.clone() + }; + + let aggregated_rows = filtered_df + .aggregate(vec![], vec![agg_expr])? + .collect() + .await?; + + let final_value = get_final_value(aggregated_rows); + let result = evaluate_condition(&agg.operator, final_value, agg.value); + + let message = if result { + agg.condition_config + .as_ref() + .map(|config| config.generate_filter_message()) + .or(Some(String::default())) + } else { + None + }; + + Ok(AggregateResult { + result, + message, + config: agg.clone(), + value: final_value, + }) +} + +fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> bool { + match operator { + AlertOperator::GreaterThan => actual > expected, + AlertOperator::LessThan => actual < expected, + AlertOperator::EqualTo => actual == expected, + AlertOperator::NotEqualTo => actual != expected, + AlertOperator::GreaterThanEqualTo => actual >= expected, + AlertOperator::LessThanEqualTo => actual <= expected, + _ => unreachable!(), + } +} + +fn calculate_final_result(agg_config: &Aggregations, results: &[AggregateResult]) -> bool { + match &agg_config.operator { + Some(AggregateCondition::And) => results.iter().all(|r| r.result), + Some(AggregateCondition::Or) => results.iter().any(|r| r.result), + None => results.first().is_some_and(|r| r.result), + } +} + +async fn update_alert_state( + alert: &AlertConfig, + final_res: bool, + agg_results: &[AggregateResult], +) -> Result<(), AlertError> { + if final_res { + trace!("ALERT!!!!!!"); + let message = format_alert_message(agg_results); + ALERTS + .update_state(alert.id, AlertState::Triggered, Some(message)) + .await + } else if ALERTS.get_state(alert.id).await?.eq(&AlertState::Triggered) { + ALERTS + .update_state(alert.id, AlertState::Resolved, Some("".into())) + .await + } else { + ALERTS + .update_state(alert.id, AlertState::Resolved, None) + .await + } +} + +fn format_alert_message(agg_results: &[AggregateResult]) -> String { + let mut message = String::default(); + for result in agg_results { + if let Some(msg) = &result.message { + message.extend([format!( + "|{}({}) WHERE ({}) {} {} (ActualValue: {})|", + result.config.agg, + result.config.column, + msg, + result.config.operator, + result.config.value, + result.value + )]); + } else { + message.extend([format!( + "|{}({}) {} {} (ActualValue: {})", + result.config.agg, + result.config.column, + result.config.operator, + result.config.value, + result.value + )]); + } + } + message +} + +fn get_final_value(aggregated_rows: Vec) -> f64 { + trace!("aggregated_rows-\n{aggregated_rows:?}"); + + if let Some(f) = aggregated_rows + .first() + .and_then(|batch| { + trace!("batch.column(0)-\n{:?}", batch.column(0)); + batch.column(0).as_any().downcast_ref::() + }) + .map(|array| { + trace!("array-\n{array:?}"); + array.value(0) + }) + { + f + } else { + aggregated_rows + .first() + .and_then(|batch| { + trace!("batch.column(0)-\n{:?}", batch.column(0)); + batch.column(0).as_any().downcast_ref::() + }) + .map(|array| { + trace!("array-\n{array:?}"); + array.value(0) + }) + .unwrap_or_default() as f64 + } +} + +/// This function accepts aggregate_config and +/// returns a tuple of (aggregate expressions, filter expressions) +/// +/// It calls get_filter_expr() to get filter expressions +fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option)> { + let mut agg_expr = Vec::new(); + + match &aggregate_config.operator { + Some(op) => match op { + AggregateCondition::And | AggregateCondition::Or => { + let agg1 = &aggregate_config.aggregate_conditions[0]; + let agg2 = &aggregate_config.aggregate_conditions[1]; + + for agg in [agg1, agg2] { + let filter_expr = if let Some(where_clause) = &agg.condition_config { + let fe = get_filter_expr(where_clause); + + trace!("filter_expr-\n{fe:?}"); + + Some(fe) + } else { + None + }; + + let e = match_aggregate_operation(agg); + agg_expr.push((e, filter_expr)); + } + } + }, + None => { + let agg = &aggregate_config.aggregate_conditions[0]; + + let filter_expr = if let Some(where_clause) = &agg.condition_config { + let fe = get_filter_expr(where_clause); + + trace!("filter_expr-\n{fe:?}"); + + Some(fe) + } else { + None + }; + + let e = match_aggregate_operation(agg); + agg_expr.push((e, filter_expr)); + } + } + agg_expr +} + +fn get_filter_expr(where_clause: &Conditions) -> Expr { + match &where_clause.operator { + Some(op) => match op { + AggregateCondition::And => { + let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true))); + + let expr1 = &where_clause.conditions[0]; + let expr2 = &where_clause.conditions[1]; + + for e in [expr1, expr2] { + let ex = match_alert_operator(e); + expr = expr.and(ex); + } + expr + } + AggregateCondition::Or => { + let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(false))); + + let expr1 = &where_clause.conditions[0]; + let expr2 = &where_clause.conditions[1]; + + for e in [expr1, expr2] { + let ex = match_alert_operator(e); + expr = expr.or(ex); + } + expr + } + }, + None => { + let expr = &where_clause.conditions[0]; + match_alert_operator(expr) + } + } +} + +fn match_alert_operator(expr: &ConditionConfig) -> Expr { + match expr.operator { + AlertOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)), + AlertOperator::LessThan => col(&expr.column).lt(lit(&expr.value)), + AlertOperator::EqualTo => col(&expr.column).eq(lit(&expr.value)), + AlertOperator::NotEqualTo => col(&expr.column).not_eq(lit(&expr.value)), + AlertOperator::GreaterThanEqualTo => col(&expr.column).gt_eq(lit(&expr.value)), + AlertOperator::LessThanEqualTo => col(&expr.column).lt_eq(lit(&expr.value)), + AlertOperator::Like => col(&expr.column).like(lit(&expr.value)), + AlertOperator::NotLike => col(&expr.column).not_like(lit(&expr.value)), + } +} +fn match_aggregate_operation(agg: &AggregateConfig) -> Expr { + match agg.agg { + AggregateOperation::Avg => avg(col(&agg.column)), + AggregateOperation::Count => count(col(&agg.column)), + AggregateOperation::Min => min(col(&agg.column)), + AggregateOperation::Max => max(col(&agg.column)), + AggregateOperation::Sum => sum(col(&agg.column)), + } +} diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index ffa7314d0..bd6603088 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -16,151 +16,70 @@ * */ -use arrow_array::cast::as_string_array; -use arrow_array::RecordBatch; -use arrow_schema::DataType; +use actix_web::http::header::ContentType; +use alerts_utils::user_auth_for_query; use async_trait::async_trait; -use datafusion::arrow::compute::kernels::cast; -use datafusion::arrow::datatypes::Schema; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use std::fmt; - -pub mod parser; -pub mod rule; +use chrono::Utc; +use datafusion::common::tree_node::TreeNode; +use http::StatusCode; +use itertools::Itertools; +use once_cell::sync::Lazy; +use serde_json::Error as SerdeError; +use std::collections::{HashMap, HashSet}; +use std::fmt::{self, Display}; +use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tracing::{trace, warn}; +use ulid::Ulid; + +pub mod alerts_utils; pub mod target; -use crate::metrics::ALERTS_STATES; use crate::option::CONFIG; -use crate::utils::arrow::get_field; -use crate::utils::uid; -use crate::{storage, utils}; +use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::rbac::map::SessionKey; +use crate::storage; +use crate::storage::ObjectStorageError; +use crate::sync::schedule_alert_task; +use crate::utils::time::TimeRange; -pub use self::rule::Rule; use self::target::Target; -#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] +// these types describe the scheduled task for an alert +pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); + +pub const CURRENT_ALERTS_VERSION: &str = "v1"; + +pub static ALERTS: Lazy = Lazy::new(Alerts::default); + +#[derive(Debug, Default)] pub struct Alerts { - pub version: AlertVerison, - pub alerts: Vec, + pub alerts: RwLock>, + pub scheduled_tasks: RwLock>, } -#[derive(Default, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "lowercase")] pub enum AlertVerison { #[default] V1, } -#[derive(Debug, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Alert { - #[serde(default = "crate::utils::uid::gen")] - pub id: uid::Uid, - pub name: String, - #[serde(flatten)] - pub message: Message, - pub rule: Rule, - pub targets: Vec, -} - -impl Alert { - pub fn check_alert(&self, stream_name: &str, events: RecordBatch) { - let resolves = self.rule.resolves(events.clone()); - - for (index, state) in resolves.into_iter().enumerate() { - match state { - AlertState::Listening | AlertState::Firing => (), - alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => { - let context = self.get_context( - stream_name.to_owned(), - alert_state, - &self.rule, - events.slice(index, 1), - ); - ALERTS_STATES - .with_label_values(&[ - context.stream.as_str(), - context.alert_info.alert_name.as_str(), - context.alert_info.alert_state.to_string().as_str(), - ]) - .inc(); - for target in &self.targets { - target.call(context.clone()); - } - } - } +impl From<&str> for AlertVerison { + fn from(value: &str) -> Self { + match value { + "v1" => Self::V1, + _ => unreachable!(), } } - - fn get_context( - &self, - stream_name: String, - alert_state: AlertState, - rule: &Rule, - event_row: RecordBatch, - ) -> Context { - let deployment_instance = format!( - "{}://{}", - CONFIG.options.get_scheme(), - CONFIG.options.address - ); - let deployment_id = storage::StorageMetadata::global().deployment_id; - let deployment_mode = storage::StorageMetadata::global().mode.to_string(); - let mut additional_labels = - serde_json::to_value(rule).expect("rule is perfectly deserializable"); - utils::json::flatten::flatten_with_parent_prefix(&mut additional_labels, "rule", "_") - .expect("can be flattened"); - Context::new( - stream_name, - AlertInfo::new( - self.name.clone(), - self.message.get(event_row), - rule.trigger_reason(), - alert_state, - ), - DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), - additional_labels, - ) - } } -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Message { - pub message: String, -} - -impl Message { - // checks if message (with a column name) is valid (i.e. the column name is present in the schema) - pub fn valid(&self, schema: &Schema, column: &str) -> bool { - get_field(&schema.fields, column).is_some() - } - - pub fn extract_column_names(&self) -> Vec<&str> { - // the message can have either no column name ({column_name} not present) or any number of {column_name} present - Regex::new(r"\{(.*?)\}") - .unwrap() - .captures_iter(self.message.as_str()) - .map(|cap| cap.get(1).unwrap().as_str()) - .collect() - } - - /// Returns the message with the column names replaced with the values in the column. - fn get(&self, event: RecordBatch) -> String { - let mut replace_message = self.message.clone(); - for column in self.extract_column_names() { - if let Some(value) = event.column_by_name(column) { - let arr = cast(value, &DataType::Utf8).unwrap(); - let value = as_string_array(&arr).value(0); - - replace_message = - replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str()); - } - } - replace_message - } +pub struct AggregateResult { + result: bool, + message: Option, + config: AggregateConfig, + value: f64, } #[async_trait] @@ -170,65 +89,64 @@ pub trait CallableTarget { #[derive(Debug, Clone)] pub struct Context { - stream: String, alert_info: AlertInfo, deployment_info: DeploymentInfo, - additional_labels: serde_json::Value, + message: String, } impl Context { - pub fn new( - stream: String, - alert_info: AlertInfo, - deployment_info: DeploymentInfo, - additional_labels: serde_json::Value, - ) -> Self { + pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo, message: String) -> Self { Self { - stream, alert_info, deployment_info, - additional_labels, + message, } } fn default_alert_string(&self) -> String { format!( - "{} triggered on {}\nMessage: {}\nFailing Condition: {}", + "AlertName: {}, Triggered TimeStamp: {}, Severity: {}, Message: {}", self.alert_info.alert_name, - self.stream, - self.alert_info.message, - self.alert_info.reason + Utc::now().to_rfc3339(), + self.alert_info.severity, + self.message ) } fn default_resolved_string(&self) -> String { + format!("{} is now resolved ", self.alert_info.alert_name) + } + + fn default_silenced_string(&self) -> String { format!( - "{} on {} is now resolved ", - self.alert_info.alert_name, self.stream + "Notifications for {} have been silenced ", + self.alert_info.alert_name ) } } #[derive(Debug, Clone)] pub struct AlertInfo { + alert_id: Ulid, alert_name: String, - message: String, - reason: String, + // message: String, + // reason: String, alert_state: AlertState, + severity: String, } impl AlertInfo { pub fn new( + alert_id: Ulid, alert_name: String, - message: String, - reason: String, alert_state: AlertState, + severity: String, ) -> Self { Self { + alert_id, alert_name, - message, - reason, alert_state, + severity, } } } @@ -236,16 +154,12 @@ impl AlertInfo { #[derive(Debug, Clone)] pub struct DeploymentInfo { deployment_instance: String, - deployment_id: uid::Uid, + deployment_id: Ulid, deployment_mode: String, } impl DeploymentInfo { - pub fn new( - deployment_instance: String, - deployment_id: uid::Uid, - deployment_mode: String, - ) -> Self { + pub fn new(deployment_instance: String, deployment_id: Ulid, deployment_mode: String) -> Self { Self { deployment_instance, deployment_id, @@ -254,27 +168,701 @@ impl DeploymentInfo { } } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AlertType { + Threshold, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AlertOperator { + #[serde(rename = ">")] + GreaterThan, + #[serde(rename = "<")] + LessThan, + #[serde(rename = "=")] + EqualTo, + #[serde(rename = "<>")] + NotEqualTo, + #[serde(rename = ">=")] + GreaterThanEqualTo, + #[serde(rename = "<=")] + LessThanEqualTo, + #[serde(rename = "like")] + Like, + #[serde(rename = "not like")] + NotLike, +} + +impl Display for AlertOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertOperator::GreaterThan => write!(f, ">"), + AlertOperator::LessThan => write!(f, "<"), + AlertOperator::EqualTo => write!(f, "="), + AlertOperator::NotEqualTo => write!(f, "<>"), + AlertOperator::GreaterThanEqualTo => write!(f, ">="), + AlertOperator::LessThanEqualTo => write!(f, "<="), + AlertOperator::Like => write!(f, "like"), + AlertOperator::NotLike => write!(f, "not like"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AggregateOperation { + Avg, + Count, + Min, + Max, + Sum, +} + +impl Display for AggregateOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AggregateOperation::Avg => write!(f, "Avg"), + AggregateOperation::Count => write!(f, "Count"), + AggregateOperation::Min => write!(f, "Min"), + AggregateOperation::Max => write!(f, "Max"), + AggregateOperation::Sum => write!(f, "Sum"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct OperationConfig { + pub column: String, + pub operator: Option, + pub value: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FilterConfig { + pub conditions: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct ConditionConfig { + pub column: String, + pub operator: AlertOperator, + pub value: String, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Conditions { + pub operator: Option, + pub conditions: Vec, +} + +// #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +// pub enum Conditions { +// AND((ConditionConfig, ConditionConfig)), +// OR((ConditionConfig, ConditionConfig)), +// Condition(ConditionConfig), +// } + +impl Conditions { + pub fn generate_filter_message(&self) -> String { + match &self.operator { + Some(op) => match op { + AggregateCondition::And | AggregateCondition::Or => { + let expr1 = &self.conditions[0]; + let expr2 = &self.conditions[1]; + format!( + "[{} {} {} AND {} {} {}]", + expr1.column, + expr1.operator, + expr1.value, + expr2.column, + expr2.operator, + expr2.value + ) + } + }, + None => { + let expr = &self.conditions[0]; + format!("[{} {} {}]", expr.column, expr.operator, expr.value) + } + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AggregateConfig { + pub agg: AggregateOperation, + pub condition_config: Option, + pub column: String, + pub operator: AlertOperator, + pub value: f64, +} + +// #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +// pub enum Aggregations { +// AND((AggregateConfig, AggregateConfig)), +// OR((AggregateConfig, AggregateConfig)), +// Single(AggregateConfig), +// } + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Aggregations { + pub operator: Option, + pub aggregate_conditions: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub enum AggregateCondition { + And, + Or, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct RollingWindow { + // x minutes (25m) + pub eval_start: String, + // should always be "now" + pub eval_end: String, + // x minutes (5m) + pub eval_frequency: u32, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum EvalConfig { + RollingWindow(RollingWindow), +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertEval {} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default)] +#[serde(rename_all = "camelCase")] pub enum AlertState { - Listening, - SetToFiring, - Firing, + Triggered, + Silenced, + #[default] Resolved, } -impl Default for AlertState { - fn default() -> Self { - Self::Listening +impl Display for AlertState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertState::Triggered => write!(f, "Triggered"), + AlertState::Silenced => write!(f, "Silenced"), + AlertState::Resolved => write!(f, "Resolved"), + } } } -impl fmt::Display for AlertState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - AlertState::Listening => write!(f, "Listening"), - AlertState::SetToFiring => write!(f, "SetToFiring"), - AlertState::Firing => write!(f, "Firing"), - AlertState::Resolved => write!(f, "Resolved"), +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub enum Severity { + Critical, + High, + #[default] + Medium, + Low, +} + +impl Display for Severity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Severity::Critical => write!(f, "Critical (P0)"), + Severity::High => write!(f, "High (P1)"), + Severity::Medium => write!(f, "Medium (P2)"), + Severity::Low => write!(f, "Low (P3)"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertRequest { + #[serde(default = "Severity::default")] + pub severity: Severity, + pub title: String, + pub query: String, + pub alert_type: AlertType, + pub aggregate_config: Aggregations, + pub eval_type: EvalConfig, + pub targets: Vec, +} + +impl From for AlertConfig { + fn from(val: AlertRequest) -> AlertConfig { + AlertConfig { + version: AlertVerison::from(CURRENT_ALERTS_VERSION), + id: Ulid::new(), + severity: val.severity, + title: val.title, + query: val.query, + alert_type: val.alert_type, + aggregate_config: val.aggregate_config, + eval_type: val.eval_type, + targets: val.targets, + state: AlertState::default(), } } } + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertConfig { + pub version: AlertVerison, + #[serde(default)] + pub id: Ulid, + pub severity: Severity, + pub title: String, + pub query: String, + pub alert_type: AlertType, + pub aggregate_config: Aggregations, + pub eval_type: EvalConfig, + pub targets: Vec, + // for new alerts, state should be resolved + #[serde(default)] + pub state: AlertState, +} + +impl AlertConfig { + pub fn modify(&mut self, alert: AlertRequest) { + self.title = alert.title; + self.query = alert.query; + self.alert_type = alert.alert_type; + self.aggregate_config = alert.aggregate_config; + self.eval_type = alert.eval_type; + self.targets = alert.targets; + self.state = AlertState::default(); + } + + /// Validations + pub async fn validate(&self) -> Result<(), AlertError> { + // validate evalType + let eval_frequency = match &self.eval_type { + EvalConfig::RollingWindow(rolling_window) => { + if rolling_window.eval_end != "now" { + return Err(AlertError::Metadata("evalEnd should be now")); + } + + if humantime::parse_duration(&rolling_window.eval_start).is_err() { + return Err(AlertError::Metadata( + "evalStart should be of type humantime", + )); + } + rolling_window.eval_frequency + } + }; + + // validate that target repeat notifs !> eval_frequency + for target in &self.targets { + match &target.timeout.times { + target::Retry::Infinite => {} + target::Retry::Finite(repeat) => { + let notif_duration = target.timeout.interval * *repeat as u32; + if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { + return Err(AlertError::Metadata( + "evalFrequency should be greater than target repetition interval", + )); + } + } + } + } + + // validate aggregateCnnfig and conditionConfig + self.validate_configs()?; + + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(&self.query).await?; + + // create a visitor to extract the table names present in query + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + + let table = visitor.into_inner().first().unwrap().to_owned(); + + let lowercase = self.query.split(&table).collect_vec()[0].to_lowercase(); + + if lowercase + .strip_prefix(" ") + .unwrap_or(&lowercase) + .strip_suffix(" ") + .unwrap_or(&lowercase) + .ne("select * from") + { + return Err(AlertError::Metadata( + "Query needs to be select * from ", + )); + } + + // TODO: Filter tags should be taken care of!!! + let time_range = TimeRange::parse_human_time("1m", "now") + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + let query = crate::query::Query { + raw_logical_plan, + time_range, + filter_tag: None, + }; + + // for now proceed in a similar fashion as we do in query + // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) + let stream_name = if let Some(stream_name) = query.first_table_name() { + stream_name + } else { + return Err(AlertError::CustomError(format!( + "Table name not found in query- {}", + self.query + ))); + }; + + let base_df = query + .get_dataframe(stream_name) + .await + .map_err(|err| AlertError::CustomError(err.to_string()))?; + + // now that we have base_df, verify that it has + // columns from aggregate config + let columns = self.get_agg_config_cols(); + + base_df.select_columns(columns.iter().map(|c| c.as_str()).collect_vec().as_slice())?; + Ok(()) + } + + fn validate_configs(&self) -> Result<(), AlertError> { + fn validate_condition_config(config: &Option) -> Result<(), AlertError> { + if config.is_none() { + return Ok(()); + } + let config = config.as_ref().unwrap(); + match &config.operator { + Some(_) => { + // only two aggregate conditions should be present + if config.conditions.len() != 2 { + return Err(AlertError::CustomError( + "While using AND/OR, two conditions must be used".to_string(), + )); + } + } + None => { + // only one aggregate condition should be present + if config.conditions.len() != 1 { + return Err(AlertError::CustomError( + "While not using AND/OR, one conditions must be used".to_string(), + )); + } + } + } + Ok(()) + } + + // validate aggregate config(s) + match &self.aggregate_config.operator { + Some(_) => { + // only two aggregate conditions should be present + if self.aggregate_config.aggregate_conditions.len() != 2 { + return Err(AlertError::CustomError( + "While using AND/OR, two aggregateConditions must be used".to_string(), + )); + } + + // validate condition config + let agg1 = &self.aggregate_config.aggregate_conditions[0]; + let agg2 = &self.aggregate_config.aggregate_conditions[0]; + + validate_condition_config(&agg1.condition_config)?; + validate_condition_config(&agg2.condition_config)?; + } + None => { + // only one aggregate condition should be present + if self.aggregate_config.aggregate_conditions.len() != 1 { + return Err(AlertError::CustomError( + "While not using AND/OR, one aggregateConditions must be used".to_string(), + )); + } + + let agg = &self.aggregate_config.aggregate_conditions[0]; + validate_condition_config(&agg.condition_config)?; + } + } + Ok(()) + } + + fn get_agg_config_cols(&self) -> HashSet<&String> { + let mut columns: HashSet<&String> = HashSet::new(); + match &self.aggregate_config.operator { + Some(op) => match op { + AggregateCondition::And | AggregateCondition::Or => { + let agg1 = &self.aggregate_config.aggregate_conditions[0]; + let agg2 = &self.aggregate_config.aggregate_conditions[1]; + + columns.insert(&agg1.column); + columns.insert(&agg2.column); + + if let Some(condition) = &agg1.condition_config { + columns.extend(self.get_condition_cols(condition)); + } + } + }, + None => { + let agg = &self.aggregate_config.aggregate_conditions[0]; + columns.insert(&agg.column); + + if let Some(condition) = &agg.condition_config { + columns.extend(self.get_condition_cols(condition)); + } + } + } + columns + } + + fn get_condition_cols<'a>(&'a self, condition: &'a Conditions) -> HashSet<&'a String> { + let mut columns: HashSet<&String> = HashSet::new(); + match &condition.operator { + Some(op) => match op { + AggregateCondition::And | AggregateCondition::Or => { + let c1 = &condition.conditions[0]; + let c2 = &condition.conditions[1]; + columns.insert(&c1.column); + columns.insert(&c2.column); + } + }, + None => { + let c = &condition.conditions[0]; + columns.insert(&c.column); + } + } + columns + } + + pub fn get_eval_frequency(&self) -> u32 { + match &self.eval_type { + EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency, + } + } + + fn get_context(&self) -> Context { + let deployment_instance = format!( + "{}://{}", + CONFIG.options.get_scheme(), + CONFIG.options.address + ); + let deployment_id = storage::StorageMetadata::global().deployment_id; + let deployment_mode = storage::StorageMetadata::global().mode.to_string(); + + // let additional_labels = + // serde_json::to_value(rule).expect("rule is perfectly deserializable"); + // let flatten_additional_labels = + // utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_") + // .expect("can be flattened"); + + Context::new( + AlertInfo::new( + self.id, + self.title.clone(), + self.state, + self.severity.clone().to_string(), + ), + DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), + String::default(), + ) + } + + pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> { + let mut context = self.get_context(); + context.message = message; + for target in &self.targets { + trace!("Target (trigger_notifications)-\n{target:?}"); + target.call(context.clone()); + } + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum AlertError { + #[error("Storage Error: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), + #[error("User is not authorized to run this query")] + Unauthorized, + #[error("ActixError: {0}")] + Error(#[from] actix_web::Error), + #[error("DataFusion Error: {0}")] + DatafusionError(#[from] datafusion::error::DataFusionError), + #[error("Error: {0}")] + CustomError(String), + #[error("Invalid State Change: {0}")] + InvalidStateChange(String), +} + +impl actix_web::ResponseError for AlertError { + fn status_code(&self) -> StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + Self::Unauthorized => StatusCode::BAD_REQUEST, + Self::Error(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::CustomError(_) => StatusCode::BAD_REQUEST, + Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} + +impl Alerts { + /// Loads alerts from disk, blocks + pub async fn load(&self) -> Result<(), AlertError> { + let mut map = self.alerts.write().await; + let store = CONFIG.storage().get_object_store(); + + for alert in store.get_alerts().await.unwrap_or_default() { + let (handle, rx, tx) = + schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + + self.update_task(alert.id, handle, rx, tx).await; + + map.insert(alert.id, alert); + } + + Ok(()) + } + + /// Returns a list of alerts that the user has access to (based on query auth) + pub async fn list_alerts_for_user( + &self, + session: SessionKey, + ) -> Result, AlertError> { + let mut alerts: Vec = Vec::new(); + for (_, alert) in self.alerts.read().await.iter() { + // filter based on whether the user can execute this query or not + let query = &alert.query; + if user_auth_for_query(&session, query).await.is_ok() { + alerts.push(alert.to_owned()); + } + } + + Ok(alerts) + } + + /// Returns a sigle alert that the user has access to (based on query auth) + pub async fn get_alert_by_id(&self, id: Ulid) -> Result { + let read_access = self.alerts.read().await; + if let Some(alert) = read_access.get(&id) { + Ok(alert.clone()) + } else { + Err(AlertError::CustomError(format!( + "No alert found for the given ID- {id}" + ))) + } + } + + /// Update the in-mem vector of alerts + pub async fn update(&self, alert: &AlertConfig) { + self.alerts.write().await.insert(alert.id, alert.clone()); + } + + /// Update the state of alert + pub async fn update_state( + &self, + alert_id: Ulid, + new_state: AlertState, + trigger_notif: Option, + ) -> Result<(), AlertError> { + let store = CONFIG.storage().get_object_store(); + + // read and modify alert + let mut alert = self.get_alert_by_id(alert_id).await?; + trace!("get alert state by id-\n{}", alert.state); + + alert.state = new_state; + + trace!("new state-\n{}", alert.state); + + // save to disk + store.put_alert(alert_id, &alert).await?; + + // modify in memory + let mut writer = self.alerts.write().await; + if let Some(alert) = writer.get_mut(&alert_id) { + trace!("in memory alert-\n{}", alert.state); + alert.state = new_state; + trace!("in memory updated alert-\n{}", alert.state); + }; + drop(writer); + + if trigger_notif.is_some() { + trace!("trigger notif on-\n{}", alert.state); + alert.trigger_notifications(trigger_notif.unwrap()).await?; + } + + Ok(()) + } + + /// Remove alert and scheduled task from disk and memory + pub async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError> { + if self.alerts.write().await.remove(&alert_id).is_some() { + trace!("removed alert from memory"); + } else { + warn!("Alert ID- {alert_id} not found in memory!"); + } + Ok(()) + } + + /// Get state of alert using alert_id + pub async fn get_state(&self, alert_id: Ulid) -> Result { + let read_access = self.alerts.read().await; + + if let Some(alert) = read_access.get(&alert_id) { + Ok(alert.state) + } else { + let msg = format!("No alert present for ID- {alert_id}"); + Err(AlertError::CustomError(msg)) + } + } + + /// Update the scheduled alert tasks in-memory map + pub async fn update_task( + &self, + id: Ulid, + handle: JoinHandle<()>, + rx: Receiver<()>, + tx: Sender<()>, + ) { + self.scheduled_tasks + .write() + .await + .insert(id, (handle, rx, tx)); + } + + /// Remove a scheduled alert task + pub async fn delete_task(&self, alert_id: Ulid) -> Result<(), AlertError> { + if self + .scheduled_tasks + .write() + .await + .remove(&alert_id) + .is_none() + { + trace!("Alert task {alert_id} not found in hashmap"); + } + + Ok(()) + } +} diff --git a/src/alerts/parser.rs b/src/alerts/parser.rs deleted file mode 100644 index 562c14b07..000000000 --- a/src/alerts/parser.rs +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::{borrow::Cow, str::FromStr}; - -use nom::{ - branch::alt, - bytes::complete::{is_not, tag, take_while1}, - character::complete::{char, multispace0, multispace1}, - combinator::{cut, map, value}, - error::{convert_error, VerboseError}, - sequence::{delimited, preceded, separated_pair}, - IResult as NomIResult, Parser, -}; - -use super::rule::{ - base::{ - ops::{NumericOperator, StringOperator}, - NumericRule, StringRule, - }, - CompositeRule, -}; - -type IResult<'a, O> = NomIResult<&'a str, O, VerboseError<&'a str>>; - -enum StrFragment<'a> { - Escaped(char), - Unescaped(&'a str), -} - -fn parse_escaped_char(input: &str) -> IResult { - preceded( - char('\\'), - alt(( - value('"', char('"')), - value('\\', char('\\')), - value('/', char('/')), - value('\n', char('n')), - value('\r', char('r')), - value('\t', char('t')), - value('\u{08}', char('b')), - value('\u{0C}', char('f')), - )), - ) - .parse(input) -} - -fn parse_str_char(input: &str) -> IResult { - alt(( - map(parse_escaped_char, StrFragment::Escaped), - map(is_not(r#""\"#), StrFragment::Unescaped), - )) - .parse(input) -} - -fn parse_string(input: &str) -> IResult> { - let mut res = Cow::Borrowed(""); - let (mut input, _) = char('"').parse(input)?; - - loop { - match char('"').parse(input) { - // If it is terminating double quotes then we can return the ok value - Ok((tail, _)) => return Ok((tail, res)), - // Fail to parsing in recoverable variant can mean it is a valid char that is not double quote - Err(nom::Err::Error(_)) => {} - Err(err) => return Err(err), - }; - - input = match cut(parse_str_char)(input)? { - (tail, StrFragment::Escaped(ch)) => { - res.to_mut().push(ch); - tail - } - (tail, StrFragment::Unescaped(s)) => { - if res.is_empty() { - res = Cow::Borrowed(s) - } else { - res.to_mut().push_str(s) - } - tail - } - }; - } -} - -fn parse_numeric_op(input: &str) -> IResult { - alt(( - map(tag("<="), |_| NumericOperator::LessThanEquals), - map(tag(">="), |_| NumericOperator::GreaterThanEquals), - map(tag("!="), |_| NumericOperator::NotEqualTo), - map(tag("<"), |_| NumericOperator::LessThan), - map(tag(">"), |_| NumericOperator::GreaterThan), - map(tag("="), |_| NumericOperator::EqualTo), - ))(input) -} - -fn parse_string_op(input: &str) -> IResult { - alt(( - map(tag("!="), |_| StringOperator::NotExact), - map(tag("=%"), |_| StringOperator::Contains), - map(tag("!%"), |_| StringOperator::NotContains), - map(tag("="), |_| StringOperator::Exact), - map(tag("~"), |_| StringOperator::Regex), - ))(input) -} - -fn parse_numeric_rule(input: &str) -> IResult { - let (remaining, key) = map(parse_identifier, |s: &str| s.to_string())(input)?; - let (remaining, op) = delimited(multispace0, parse_numeric_op, multispace0)(remaining)?; - let (remaining, value) = map(take_while1(|c: char| c.is_ascii_digit()), |x| { - str::parse(x).unwrap() - })(remaining)?; - - Ok(( - remaining, - CompositeRule::Numeric(NumericRule { - column: key, - operator: op, - value, - }), - )) -} - -fn parse_string_rule(input: &str) -> IResult { - let (remaining, key) = map(parse_identifier, |s: &str| s.to_string())(input)?; - let (remaining, op) = delimited(multispace0, parse_string_op, multispace0)(remaining)?; - let (remaining, value) = parse_string(remaining)?; - - Ok(( - remaining, - CompositeRule::String(StringRule { - column: key, - operator: op, - value: value.into_owned(), - ignore_case: None, - }), - )) -} - -fn parse_identifier(input: &str) -> IResult<&str> { - take_while1(|c: char| c.is_alphanumeric() || c == '-' || c == '_')(input) -} - -fn parse_unary_expr(input: &str) -> IResult { - map( - delimited(tag("!("), cut(parse_expression), char(')')), - |x| CompositeRule::Not(Box::new(x)), - )(input) -} - -fn parse_bracket_expr(input: &str) -> IResult { - delimited( - char('('), - delimited(multispace0, cut(parse_expression), multispace0), - cut(char(')')), - )(input) -} - -fn parse_and(input: &str) -> IResult { - let (remaining, (lhs, rhs)) = separated_pair( - parse_atom, - delimited(multispace1, tag("and"), multispace1), - cut(parse_term), - )(input)?; - - Ok((remaining, CompositeRule::And(vec![lhs, rhs]))) -} - -fn parse_or(input: &str) -> IResult { - let (remaining, (lhs, rhs)) = separated_pair( - parse_term, - delimited(multispace1, tag("or"), multispace1), - cut(parse_expression), - )(input)?; - - Ok((remaining, CompositeRule::Or(vec![lhs, rhs]))) -} - -fn parse_expression(input: &str) -> IResult { - alt((parse_or, parse_term))(input) -} - -fn parse_term(input: &str) -> IResult { - alt((parse_and, parse_atom))(input) -} - -fn parse_atom(input: &str) -> IResult { - alt(( - alt((parse_string_rule, parse_numeric_rule)), - parse_unary_expr, - parse_bracket_expr, - ))(input) -} - -impl FromStr for CompositeRule { - type Err = String; - - fn from_str(s: &str) -> Result { - let s = s.trim(); - let (remaining, parsed) = parse_expression(s).map_err(|err| match err { - nom::Err::Incomplete(_) => "Needed more data".to_string(), - nom::Err::Error(err) | nom::Err::Failure(err) => convert_error(s, err), - })?; - - if remaining.is_empty() { - Ok(parsed) - } else { - Err(format!("Could not parse input \n{}", remaining)) - } - } -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use crate::alerts::rule::{ - base::{ - ops::{NumericOperator, StringOperator}, - NumericRule, StringRule, - }, - CompositeRule, - }; - - #[test] - fn test_and_or_not() { - let input = r#"key=500 and key="value" or !(key=300)"#; - let rule = CompositeRule::from_str(input).unwrap(); - - let numeric1 = NumericRule { - column: "key".to_string(), - operator: NumericOperator::EqualTo, - value: serde_json::Number::from(500), - }; - - let string1 = StringRule { - column: "key".to_string(), - operator: StringOperator::Exact, - value: "value".to_string(), - ignore_case: None, - }; - - let numeric3 = NumericRule { - column: "key".to_string(), - operator: NumericOperator::EqualTo, - value: serde_json::Number::from(300), - }; - - assert_eq!( - rule, - CompositeRule::Or(vec![ - CompositeRule::And(vec![ - CompositeRule::Numeric(numeric1), - CompositeRule::String(string1) - ]), - CompositeRule::Not(Box::new(CompositeRule::Numeric(numeric3))) - ]) - ) - } - - #[test] - fn test_complex() { - let input = r#"(verb =% "list" or verb =% "get") and (resource = "secret" and username !% "admin")"#; - let rule = CompositeRule::from_str(input).unwrap(); - - let verb_like_list = StringRule { - column: "verb".to_string(), - operator: StringOperator::Contains, - value: "list".to_string(), - ignore_case: None, - }; - - let verb_like_get = StringRule { - column: "verb".to_string(), - operator: StringOperator::Contains, - value: "get".to_string(), - ignore_case: None, - }; - - let resource_exact_secret = StringRule { - column: "resource".to_string(), - operator: StringOperator::Exact, - value: "secret".to_string(), - ignore_case: None, - }; - - let username_notcontains_admin = StringRule { - column: "username".to_string(), - operator: StringOperator::NotContains, - value: "admin".to_string(), - ignore_case: None, - }; - - assert_eq!( - rule, - CompositeRule::And(vec![ - CompositeRule::Or(vec![ - CompositeRule::String(verb_like_list), - CompositeRule::String(verb_like_get) - ]), - CompositeRule::And(vec![ - CompositeRule::String(resource_exact_secret), - CompositeRule::String(username_notcontains_admin) - ]), - ]) - ) - } -} diff --git a/src/alerts/rule.rs b/src/alerts/rule.rs deleted file mode 100644 index dc9c19964..000000000 --- a/src/alerts/rule.rs +++ /dev/null @@ -1,706 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use arrow_array::{cast::as_string_array, RecordBatch}; -use datafusion::arrow::datatypes::Schema; -use itertools::Itertools; -use serde::{ - de::{MapAccess, Visitor}, - Deserialize, Deserializer, -}; -use std::{ - fmt, - marker::PhantomData, - str::FromStr, - sync::atomic::{AtomicU32, Ordering}, -}; - -use self::base::{ - ops::{NumericOperator, StringOperator}, - NumericRule, StringRule, -}; - -use super::AlertState; - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -#[serde(tag = "type", content = "config")] -#[serde(rename_all = "camelCase")] -pub enum Rule { - Column(ColumnRule), - #[serde(deserialize_with = "string_or_struct", serialize_with = "to_string")] - Composite(CompositeRule), -} - -impl Rule { - pub fn resolves(&self, event: RecordBatch) -> Vec { - match self { - Rule::Column(rule) => rule.resolves(event), - Rule::Composite(rule) => rule - .resolves(event) - .iter() - .map(|x| { - if *x { - AlertState::SetToFiring - } else { - AlertState::Listening - } - }) - .collect(), - } - } - - pub fn valid_for_schema(&self, schema: &Schema) -> bool { - match self { - Rule::Column(rule) => rule.valid_for_schema(schema), - Rule::Composite(rule) => rule.valid_for_schema(schema), - } - } - - pub fn trigger_reason(&self) -> String { - match self { - Rule::Column(rule) => rule.trigger_reason(), - Rule::Composite(rule) => format!("matched rule {}", rule), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -#[serde(untagged)] -pub enum ColumnRule { - ConsecutiveNumeric(ConsecutiveNumericRule), - ConsecutiveString(ConsecutiveStringRule), -} - -impl ColumnRule { - fn resolves(&self, event: RecordBatch) -> Vec { - match self { - Self::ConsecutiveNumeric(rule) => rule.resolves(event), - Self::ConsecutiveString(rule) => rule.resolves(event), - } - } - - fn valid_for_schema(&self, schema: &Schema) -> bool { - match self { - Self::ConsecutiveNumeric(ConsecutiveNumericRule { - base_rule: rule, .. - }) => rule.valid_for_schema(schema), - Self::ConsecutiveString(ConsecutiveStringRule { - base_rule: rule, .. - }) => rule.valid_for_schema(schema), - } - } - - fn trigger_reason(&self) -> String { - match self { - Self::ConsecutiveNumeric(ConsecutiveNumericRule { - base_rule: - NumericRule { - column, - operator, - value, - }, - state: ConsecutiveRepeatState { repeats, .. }, - .. - }) => format!( - "{} column was {} {}, {} times", - column, - match operator { - NumericOperator::EqualTo => "equal to", - NumericOperator::NotEqualTo => " not equal to", - NumericOperator::GreaterThan => "greater than", - NumericOperator::GreaterThanEquals => "greater than or equal to", - NumericOperator::LessThan => "less than", - NumericOperator::LessThanEquals => "less than or equal to", - }, - value, - repeats - ), - Self::ConsecutiveString(ConsecutiveStringRule { - base_rule: - StringRule { - column, - operator, - value, - .. - }, - state: ConsecutiveRepeatState { repeats, .. }, - .. - }) => format!( - "{} column {} {}, {} times", - column, - match operator { - StringOperator::Exact => "equal to", - StringOperator::NotExact => "not equal to", - StringOperator::Contains => "contains", - StringOperator::NotContains => "does not contain", - StringOperator::Regex => "matches regex", - }, - value, - repeats - ), - } - } -} - -// Rules for alerts - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ConsecutiveNumericRule { - #[serde(flatten)] - pub base_rule: base::NumericRule, - #[serde(flatten)] - pub state: ConsecutiveRepeatState, -} - -impl ConsecutiveNumericRule { - fn resolves(&self, event: RecordBatch) -> Vec { - let Some(column) = event.column_by_name(&self.base_rule.column) else { - return Vec::new(); - }; - - let base_matches = self.base_rule.resolves(column); - - base_matches - .into_iter() - .map(|matches| { - if matches { - self.state.update_and_fetch_state() - } else { - self.state.fetch_state() - } - }) - .collect() - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ConsecutiveStringRule { - #[serde(flatten)] - pub base_rule: base::StringRule, - #[serde(flatten)] - pub state: ConsecutiveRepeatState, -} - -impl ConsecutiveStringRule { - fn resolves(&self, event: RecordBatch) -> Vec { - let Some(column) = event.column_by_name(&self.base_rule.column) else { - return Vec::new(); - }; - - let base_matches = self.base_rule.resolves(as_string_array(column)); - - base_matches - .into_iter() - .map(|matches| { - if matches { - self.state.update_and_fetch_state() - } else { - self.state.fetch_state() - } - }) - .collect() - } -} - -fn one() -> u32 { - 1 -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub enum CompositeRule { - And(Vec), - Or(Vec), - Not(Box), - Numeric(NumericRule), - String(StringRule), -} - -impl CompositeRule { - fn resolves(&self, event: RecordBatch) -> Vec { - let res = match self { - CompositeRule::And(rules) => { - // get individual evaluation for each subrule - let mut evaluations = rules - .iter() - .map(|x| x.resolves(event.clone())) - .collect_vec(); - // They all must be of same length otherwise some columns was missing in evaluation - let is_same_len = evaluations.iter().map(|x| x.len()).all_equal(); - // if there are more than one rule then we go through all evaluations and compare them side by side - if is_same_len && evaluations.len() > 1 { - (0..evaluations[0].len()) - .map(|idx| evaluations.iter().all(|x| x[idx])) - .collect() - } else if is_same_len && evaluations.len() == 1 { - evaluations.pop().expect("length one") - } else { - vec![] - } - } - CompositeRule::Or(rules) => { - // get individual evaluation for each subrule - let evaluations: Vec> = rules - .iter() - .map(|x| x.resolves(event.clone())) - .collect_vec(); - let mut evaluation_iterators = evaluations.iter().map(|x| x.iter()).collect_vec(); - let mut res = vec![]; - - loop { - let mut continue_iteration = false; - let mut accumulator = false; - for iter in &mut evaluation_iterators { - if let Some(val) = iter.next() { - accumulator = accumulator || *val; - continue_iteration = true - } - } - if !continue_iteration { - break; - } else { - res.push(accumulator) - } - } - - res - } - CompositeRule::Numeric(rule) => { - let Some(column) = event.column_by_name(&rule.column) else { - return Vec::new(); - }; - rule.resolves(column) - } - CompositeRule::String(rule) => { - let Some(column) = event.column_by_name(&rule.column) else { - return Vec::new(); - }; - rule.resolves(as_string_array(column)) - } - CompositeRule::Not(rule) => { - let mut res = rule.resolves(event); - res.iter_mut().for_each(|x| *x = !*x); - res - } - }; - - res - } - - fn valid_for_schema(&self, schema: &Schema) -> bool { - match self { - CompositeRule::And(rules) => rules.iter().all(|rule| rule.valid_for_schema(schema)), - CompositeRule::Or(rules) => rules.iter().all(|rule| rule.valid_for_schema(schema)), - CompositeRule::Not(rule) => rule.valid_for_schema(schema), - CompositeRule::Numeric(rule) => rule.valid_for_schema(schema), - CompositeRule::String(rule) => rule.valid_for_schema(schema), - } - } -} - -impl fmt::Display for CompositeRule { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let v = match self { - CompositeRule::And(rules) => { - let rules_str: Vec = rules.iter().map(|rule| rule.to_string()).collect(); - format!("({})", rules_str.join(" and ")) - } - CompositeRule::Or(rules) => { - let rules_str: Vec = rules.iter().map(|rule| rule.to_string()).collect(); - format!("({})", rules_str.join(" or ")) - } - CompositeRule::Not(rule) => format!("!({})", rule), - CompositeRule::Numeric(numeric_rule) => numeric_rule.to_string(), - CompositeRule::String(string_rule) => string_rule.to_string(), - }; - write!(f, "{}", v) - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct ConsecutiveRepeatState { - #[serde(default = "one")] - pub repeats: u32, - #[serde(skip)] - repeated: AtomicU32, -} - -impl ConsecutiveRepeatState { - fn update_and_fetch_state(&self) -> AlertState { - self._fetch_state(true) - } - - fn fetch_state(&self) -> AlertState { - self._fetch_state(false) - } - - fn _fetch_state(&self, update: bool) -> AlertState { - let mut repeated = self.repeated.load(Ordering::Acquire); - let mut state = AlertState::Listening; - - let firing = repeated >= self.repeats; - - if firing { - if update { - state = AlertState::Firing; - } else { - // did not match, i.e resolved - repeated = 0; - state = AlertState::Resolved; - } - } else if update { - repeated += 1; - if repeated == self.repeats { - state = AlertState::SetToFiring; - } - } - - self.repeated.store(repeated, Ordering::Release); - state - } -} - -fn string_or_struct<'de, T, D>(deserializer: D) -> Result -where - T: Deserialize<'de> + FromStr, - D: Deserializer<'de>, -{ - // This is a Visitor that forwards string types to T's `FromStr` impl and - // forwards map types to T's `Deserialize` impl. The `PhantomData` is to - // keep the compiler from complaining about T being an unused generic type - // parameter. We need T in order to know the Value type for the Visitor - // impl. - struct StringOrStruct(PhantomData T>); - - impl<'de, T> Visitor<'de> for StringOrStruct - where - T: Deserialize<'de> + FromStr, - { - type Value = T; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("string or map") - } - - fn visit_str(self, value: &str) -> Result - where - E: serde::de::Error, - { - FromStr::from_str(value).map_err(|x| serde::de::Error::custom(x)) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - // `MapAccessDeserializer` is a wrapper that turns a `MapAccess` - // into a `Deserializer`, allowing it to be used as the input to T's - // `Deserialize` implementation. T then deserializes itself using - // the entries from the map visitor. - Deserialize::deserialize(serde::de::value::MapAccessDeserializer::new(map)) - } - } - - deserializer.deserialize_any(StringOrStruct(PhantomData)) -} - -fn to_string(ty: &CompositeRule, serializer: S) -> Result -where - S: serde::Serializer, -{ - serializer.serialize_str(&ty.to_string()) -} - -#[cfg(test)] -mod tests { - use std::sync::atomic::AtomicU32; - - use rstest::*; - - use super::{AlertState, ConsecutiveRepeatState}; - - #[fixture] - pub fn rule(#[default(5)] repeats: u32, #[default(0)] repeated: u32) -> ConsecutiveRepeatState { - ConsecutiveRepeatState { - repeats, - repeated: AtomicU32::new(repeated), - } - } - - #[rstest] - fn numeric_consecutive_rule_repeats_1(#[with(1, 0)] rule: ConsecutiveRepeatState) { - assert_eq!(rule.update_and_fetch_state(), AlertState::SetToFiring); - assert_eq!(rule.update_and_fetch_state(), AlertState::Firing); - assert_eq!(rule.update_and_fetch_state(), AlertState::Firing); - assert_eq!(rule.fetch_state(), AlertState::Resolved); - assert_eq!(rule.fetch_state(), AlertState::Listening); - assert_eq!(rule.update_and_fetch_state(), AlertState::SetToFiring); - } - - #[rstest] - fn numeric_consecutive_rule_repeats_2(#[with(2, 1)] rule: ConsecutiveRepeatState) { - assert_eq!(rule.update_and_fetch_state(), AlertState::SetToFiring); - assert_eq!(rule.update_and_fetch_state(), AlertState::Firing); - assert_eq!(rule.fetch_state(), AlertState::Resolved); - assert_eq!(rule.fetch_state(), AlertState::Listening); - assert_eq!(rule.update_and_fetch_state(), AlertState::Listening); - assert_eq!(rule.update_and_fetch_state(), AlertState::SetToFiring); - } -} - -pub mod base { - use std::fmt::Display; - - use arrow_array::{ - cast::as_primitive_array, - types::{Float64Type, Int64Type, UInt64Type}, - Array, ArrowPrimitiveType, PrimitiveArray, StringArray, - }; - use arrow_schema::{DataType, Schema}; - use itertools::Itertools; - - use self::ops::{NumericOperator, StringOperator}; - use regex::Regex; - - #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] - #[serde(rename_all = "camelCase")] - pub struct NumericRule { - pub column: String, - /// Field that determines what comparison operator is to be used - #[serde(default)] - pub operator: NumericOperator, - pub value: serde_json::Number, - } - - impl NumericRule { - pub fn resolves(&self, event: &dyn Array) -> Vec { - let datatype = event.data_type(); - match datatype { - arrow_schema::DataType::Int64 => Self::eval_op( - self.operator, - self.value.as_i64().unwrap(), - as_primitive_array::(event), - ), - arrow_schema::DataType::UInt64 => Self::eval_op( - self.operator, - self.value.as_u64().unwrap(), - as_primitive_array::(event), - ), - arrow_schema::DataType::Float64 => Self::eval_op( - self.operator, - self.value.as_f64().unwrap(), - as_primitive_array::(event), - ), - _ => unreachable!(), - } - } - - fn eval_op( - op: NumericOperator, - value: T::Native, - arr: &PrimitiveArray, - ) -> Vec { - arr.iter() - .map(|number| { - let Some(number) = number else { return false }; - match op { - NumericOperator::EqualTo => number == value, - NumericOperator::NotEqualTo => number != value, - NumericOperator::GreaterThan => number > value, - NumericOperator::GreaterThanEquals => number >= value, - NumericOperator::LessThan => number < value, - NumericOperator::LessThanEquals => number <= value, - } - }) - .collect() - } - - pub fn valid_for_schema(&self, schema: &Schema) -> bool { - match schema.column_with_name(&self.column) { - Some((_, column)) => matches!( - column.data_type(), - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float16 - | DataType::Float32 - | DataType::Float64 - ), - None => false, - } - } - } - - impl Display for NumericRule { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} {} {}", self.column, self.operator, self.value) - } - } - - #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] - #[serde(rename_all = "camelCase")] - pub struct StringRule { - pub column: String, - #[serde(default)] - pub operator: StringOperator, - pub ignore_case: Option, - pub value: String, - } - - impl StringRule { - pub fn resolves(&self, event: &StringArray) -> Vec { - event - .iter() - .map(|string| { - let Some(string) = string else { return false }; - Self::matches( - self.operator, - string, - &self.value, - self.ignore_case.unwrap_or_default(), - ) - }) - .collect_vec() - } - - fn matches(op: StringOperator, string: &str, value: &str, ignore_case: bool) -> bool { - if ignore_case { - match op { - StringOperator::Exact => string.eq_ignore_ascii_case(value), - StringOperator::NotExact => !string.eq_ignore_ascii_case(value), - StringOperator::Contains => string - .to_ascii_lowercase() - .contains(&value.to_ascii_lowercase()), - StringOperator::NotContains => !string - .to_ascii_lowercase() - .contains(&value.to_ascii_lowercase()), - StringOperator::Regex => { - let re: Regex = regex::Regex::new(value).unwrap(); - re.is_match(string) - } - } - } else { - match op { - StringOperator::Exact => string.eq(value), - StringOperator::NotExact => !string.eq(value), - StringOperator::Contains => string.contains(value), - StringOperator::NotContains => !string.contains(value), - StringOperator::Regex => { - let re: Regex = regex::Regex::new(value).unwrap(); - re.is_match(string) - } - } - } - } - - pub fn valid_for_schema(&self, schema: &Schema) -> bool { - match schema.column_with_name(&self.column) { - Some((_, column)) => matches!(column.data_type(), DataType::Utf8), - None => false, - } - } - } - - impl Display for StringRule { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} {} \"{}\"", self.column, self.operator, self.value) - } - } - - pub mod ops { - use std::fmt::Display; - - #[derive( - Debug, Default, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, - )] - #[serde(rename_all = "camelCase")] - pub enum NumericOperator { - #[default] - #[serde(alias = "=")] - EqualTo, - #[serde(alias = "!=")] - NotEqualTo, - #[serde(alias = ">")] - GreaterThan, - #[serde(alias = ">=")] - GreaterThanEquals, - #[serde(alias = "<")] - LessThan, - #[serde(alias = "<=")] - LessThanEquals, - } - - impl Display for NumericOperator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - NumericOperator::EqualTo => "=", - NumericOperator::NotEqualTo => "!=", - NumericOperator::GreaterThan => ">", - NumericOperator::GreaterThanEquals => ">=", - NumericOperator::LessThan => "<", - NumericOperator::LessThanEquals => "<=", - } - ) - } - } - - #[derive( - Debug, Default, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, - )] - #[serde(rename_all = "camelCase")] - pub enum StringOperator { - #[serde(alias = "=")] - Exact, - #[serde(alias = "!=")] - NotExact, - #[default] - #[serde(alias = "=%")] - Contains, - #[serde(alias = "!%")] - NotContains, - #[serde(alias = "~")] - Regex, - } - - impl Display for StringOperator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - StringOperator::Exact => "=", - StringOperator::NotExact => "!=", - StringOperator::Contains => "=%", - StringOperator::NotContains => "!%", - StringOperator::Regex => "~", - } - ) - } - } - } -} diff --git a/src/alerts/target.rs b/src/alerts/target.rs index c7e2c7586..b92784cc4 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -28,9 +28,9 @@ use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; use reqwest::ClientBuilder; -use tracing::error; +use tracing::{error, trace, warn}; -use crate::utils::json; +use super::ALERTS; use super::{AlertState, CallableTarget, Context}; @@ -42,7 +42,13 @@ pub enum Retry { Finite(usize), } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +impl Default for Retry { + fn default() -> Self { + Retry::Finite(1) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] #[serde(rename_all = "lowercase")] #[serde(try_from = "TargetVerifier")] pub struct Target { @@ -54,24 +60,29 @@ pub struct Target { impl Target { pub fn call(&self, context: Context) { + trace!("target.call context- {context:?}"); let timeout = &self.timeout; let resolves = context.alert_info.alert_state; let mut state = timeout.state.lock().unwrap(); + trace!("target.call state- {state:?}"); + state.alert_state = resolves; match resolves { - AlertState::SetToFiring => { - state.alert_state = AlertState::Firing; + AlertState::Triggered => { if !state.timed_out { + // call once and then start sleeping + // reduce repeats by 1 + call_target(self.target.clone(), context.clone()); + trace!("state not timed out- {state:?}"); // set state state.timed_out = true; state.awaiting_resolve = true; drop(state); self.spawn_timeout_task(timeout, context.clone()); - call_target(self.target.clone(), context) } } - AlertState::Resolved => { - state.alert_state = AlertState::Listening; + alert_state @ (AlertState::Resolved | AlertState::Silenced) => { + state.alert_state = alert_state; if state.timed_out { // if in timeout and resolve came in, only process if it's the first one ( awaiting resolve ) if state.awaiting_resolve { @@ -84,63 +95,88 @@ impl Target { call_target(self.target.clone(), context); } - _ => unreachable!(), } } - fn spawn_timeout_task(&self, repeat: &Timeout, alert_context: Context) { - let state = Arc::clone(&repeat.state); - let retry = repeat.times; - let timeout = repeat.interval; + fn spawn_timeout_task(&self, target_timeout: &Timeout, alert_context: Context) { + trace!("repeat-\n{target_timeout:?}"); + let state = Arc::clone(&target_timeout.state); + let retry = target_timeout.times; + let timeout = target_timeout.interval; let target = self.target.clone(); + let alert_id = alert_context.alert_info.alert_id; - let sleep_and_check_if_call = move |timeout_state: Arc>| { - async move { - tokio::time::sleep(timeout).await; - let mut state = timeout_state.lock().unwrap(); - if state.alert_state == AlertState::Firing { - // it is still firing .. sleep more and come back - state.awaiting_resolve = true; - true - } else { - state.timed_out = false; - false + let sleep_and_check_if_call = + move |timeout_state: Arc>, current_state: AlertState| { + async move { + tokio::time::sleep(timeout).await; + + let mut state = timeout_state.lock().unwrap(); + + if current_state == AlertState::Triggered { + // it is still firing .. sleep more and come back + state.awaiting_resolve = true; + true + } else { + state.timed_out = false; + false + } } - } - }; + }; - actix_web::rt::spawn(async move { + trace!("Spawning retry task"); + tokio::spawn(async move { match retry { Retry::Infinite => loop { - let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; + let current_state = if let Ok(state) = ALERTS.get_state(alert_id).await { + state + } else { + *state.lock().unwrap() = TimeoutState::default(); + warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + return; + }; + + let should_call = + sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { call_target(target.clone(), alert_context.clone()) } }, Retry::Finite(times) => { - for _ in 0..times { - let should_call = sleep_and_check_if_call(Arc::clone(&state)).await; + for _ in 0..(times - 1) { + let current_state = if let Ok(state) = ALERTS.get_state(alert_id).await { + state + } else { + *state.lock().unwrap() = TimeoutState::default(); + warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + return; + }; + + let should_call = + sleep_and_check_if_call(Arc::clone(&state), current_state).await; if should_call { call_target(target.clone(), alert_context.clone()) } } - // fallback for if this task only observed FIRING on all RETRIES - // Stream might be dead and sending too many alerts is not great - // Send and alert stating that this alert will only work once it has seen a RESOLVE - state.lock().unwrap().timed_out = false; - let mut context = alert_context; - context.alert_info.message = format!( - "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); - // Send and exit this task. - call_target(target, context); + // // fallback for if this task only observed FIRING on all RETRIES + // // Stream might be dead and sending too many alerts is not great + // // Send and alert stating that this alert will only work once it has seen a RESOLVE + // state.lock().unwrap().timed_out = false; + // let context = alert_context; + // // context.alert_info.message = format!( + // // "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); + // // Send and exit this task. + // call_target(target, context); } } + *state.lock().unwrap() = TimeoutState::default(); }); } } fn call_target(target: TargetType, context: Context) { - actix_web::rt::spawn(async move { target.call(&context).await }); + trace!("Calling target with context- {context:?}"); + tokio::spawn(async move { target.call(&context).await }); } #[derive(Debug, serde::Deserialize)] @@ -230,13 +266,15 @@ impl CallableTarget for SlackWebHook { .expect("Client can be constructed on this system"); let alert = match payload.alert_info.alert_state { - AlertState::SetToFiring => { + AlertState::Triggered => { serde_json::json!({ "text": payload.default_alert_string() }) } AlertState::Resolved => { serde_json::json!({ "text": payload.default_resolved_string() }) } - _ => unreachable!(), + AlertState::Silenced => { + serde_json::json!({ "text": payload.default_silenced_string() }) + } }; if let Err(e) = client.post(&self.endpoint).json(&alert).send().await { @@ -268,9 +306,9 @@ impl CallableTarget for OtherWebHook { .expect("Client can be constructed on this system"); let alert = match payload.alert_info.alert_state { - AlertState::SetToFiring => payload.default_alert_string(), + AlertState::Triggered => payload.default_alert_string(), AlertState::Resolved => payload.default_resolved_string(), - _ => unreachable!(), + AlertState::Silenced => payload.default_silenced_string(), }; let request = client @@ -318,33 +356,33 @@ impl CallableTarget for AlertManager { let mut alerts = serde_json::json!([{ "labels": { "alertname": payload.alert_info.alert_name, - "stream": payload.stream, + // "stream": payload.stream, "deployment_instance": payload.deployment_info.deployment_instance, "deployment_id": payload.deployment_info.deployment_id, "deployment_mode": payload.deployment_info.deployment_mode }, "annotations": { - "message": payload.alert_info.message, - "reason": payload.alert_info.reason + "message": "MESSAGE", + "reason": "REASON" } }]); let alert = &mut alerts[0]; - alert["labels"].as_object_mut().expect("is object").extend( - payload - .additional_labels - .as_object() - .expect("is object") - .iter() - // filter non null values for alertmanager and only pass strings - .filter(|(_, value)| !value.is_null()) - .map(|(k, value)| (k.to_owned(), json::convert_to_string(value))), - ); + // alert["labels"].as_object_mut().expect("is object").extend( + // payload + // .additional_labels + // .as_object() + // .expect("is object") + // .iter() + // // filter non null values for alertmanager and only pass strings + // .filter(|(_, value)| !value.is_null()) + // .map(|(k, value)| (k.to_owned(), json::convert_to_string(value))), + // ); // fill in status label accordingly match payload.alert_info.alert_state { - AlertState::SetToFiring => alert["labels"]["status"] = "firing".into(), + AlertState::Triggered => alert["labels"]["status"] = "triggered".into(), AlertState::Resolved => { alert["labels"]["status"] = "resolved".into(); alert["annotations"]["reason"] = @@ -353,7 +391,14 @@ impl CallableTarget for AlertManager { .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) .into(); } - _ => unreachable!(), + AlertState::Silenced => { + alert["labels"]["status"] = "silenced".into(); + alert["annotations"]["reason"] = + serde_json::Value::String(payload.default_silenced_string()); + // alert["endsAt"] = Utc::now() + // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) + // .into(); + } }; if let Err(e) = client.post(&self.endpoint).json(&alerts).send().await { @@ -362,10 +407,11 @@ impl CallableTarget for AlertManager { } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct Timeout { #[serde(with = "humantime_serde")] pub interval: Duration, + #[serde(default = "Retry::default")] pub times: Retry, #[serde(skip)] pub state: Arc>, @@ -374,8 +420,8 @@ pub struct Timeout { impl Default for Timeout { fn default() -> Self { Self { - interval: Duration::from_secs(200), - times: Retry::Finite(5), + interval: Duration::from_secs(60), + times: Retry::default(), state: Arc::>::default(), } } diff --git a/src/event/mod.rs b/src/event/mod.rs index 0c2c1f6b9..ce4c5d23e 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -24,7 +24,6 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; use std::sync::Arc; -use tracing::error; use self::error::EventError; pub use self::writer::STREAM_WRITERS; @@ -88,13 +87,6 @@ impl Event { crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); - if let Err(e) = metadata::STREAM_INFO - .check_alerts(&self.stream_name, &self.rb) - .await - { - error!("Error checking for alerts. {:?}", e); - } - Ok(()) } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs new file mode 100644 index 000000000..9bdeefc2c --- /dev/null +++ b/src/handlers/http/alerts.rs @@ -0,0 +1,208 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + option::CONFIG, storage::object_storage::alert_json_path, sync::schedule_alert_task, + utils::actix::extract_session_key_from_req, +}; +use actix_web::{ + web::{self, Json, Path}, + HttpRequest, Responder, +}; +use bytes::Bytes; +use ulid::Ulid; + +use crate::alerts::{ + alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS, +}; + +// GET /alerts +/// User needs at least a read access to the stream(s) that is being referenced in an alert +/// Read all alerts then return alerts which satisfy the condition +pub async fn list(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alerts = ALERTS.list_alerts_for_user(session_key).await?; + + Ok(web::Json(alerts)) +} + +// POST /alerts +pub async fn post( + req: HttpRequest, + Json(alert): Json, +) -> Result { + let alert: AlertConfig = alert.into(); + alert.validate().await?; + + // validate the incoming alert query + // does the user have access to these tables or not? + let session_key = extract_session_key_from_req(&req)?; + user_auth_for_query(&session_key, &alert.query).await?; + + // create scheduled tasks + let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + + // now that we've validated that the user can run this query + // move on to saving the alert in ObjectStore + ALERTS.update(&alert).await; + + let path = alert_json_path(alert.id); + + let store = CONFIG.storage().get_object_store(); + let alert_bytes = serde_json::to_vec(&alert)?; + store.put_object(&path, Bytes::from(alert_bytes)).await?; + + ALERTS.update_task(alert.id, handle, rx, tx).await; + + Ok(web::Json(alert)) +} + +// GET /alerts/{alert_id} +pub async fn get(req: HttpRequest, alert_id: Path) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let alert = ALERTS.get_alert_by_id(alert_id).await?; + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + + Ok(web::Json(alert)) +} + +// DELETE /alerts/{alert_id} +/// Deletion should happen from disk, sheduled tasks, then memory +pub async fn delete(req: HttpRequest, alert_id: Path) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + + let store = CONFIG.storage().get_object_store(); + let alert_path = alert_json_path(alert_id); + + // delete from disk + store + .delete_object(&alert_path) + .await + .map_err(AlertError::ObjectStorage)?; + + // delete from disk and memory + ALERTS.delete(alert_id).await?; + + // delete the scheduled task + ALERTS.delete_task(alert_id).await?; + + Ok(format!("Deleted alert with ID- {alert_id}")) +} + +// PUT /alerts/{alert_id} +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn modify( + req: HttpRequest, + alert_id: Path, + Json(alert_request): Json, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + // check if alert id exists in map + let mut alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + // in the old as well as the modified alert + user_auth_for_query(&session_key, &alert.query).await?; + user_auth_for_query(&session_key, &alert_request.query).await?; + + alert.modify(alert_request); + alert.validate().await?; + + // modify task + let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?; + + // modify on disk + CONFIG + .storage() + .get_object_store() + .put_alert(alert.id, &alert) + .await?; + + // modify in memory + ALERTS.update(&alert).await; + + ALERTS.update_task(alert.id, handle, rx, tx).await; + + Ok(web::Json(alert)) +} + +// PUT /alerts/{alert_id}/update_state +pub async fn update_state( + req: HttpRequest, + alert_id: Path, + state: String, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + // check if alert id exists in map + let alert = ALERTS.get_alert_by_id(alert_id).await?; + + // validate that the user has access to the tables mentioned + user_auth_for_query(&session_key, &alert.query).await?; + + // get current state + let current_state = ALERTS.get_state(alert_id).await?; + + let new_state: AlertState = serde_json::from_str(&state)?; + + match current_state { + AlertState::Triggered => { + if new_state == AlertState::Triggered { + let msg = format!("Not allowed to manually go from Triggered to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } else { + // update state on disk and in memory + ALERTS + .update_state(alert_id, new_state, Some("".into())) + .await?; + } + } + AlertState::Silenced => { + // from here, the user can only go to Resolved + if new_state == AlertState::Resolved { + // update state on disk and in memory + ALERTS + .update_state(alert_id, new_state, Some("".into())) + .await?; + } else { + let msg = format!("Not allowed to manually go from Silenced to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + AlertState::Resolved => { + // user shouldn't logically be changing states if current state is Resolved + let msg = format!("Not allowed to go manually from Resolved to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + + Ok("") +} diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 0239c5fd0..e38f42db7 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -24,7 +24,6 @@ use super::modal::utils::logstream_utils::{ create_stream_and_schema_from_storage, create_update_stream, update_first_event_at, }; use super::query::update_schema_when_distributed; -use crate::alerts::Alerts; use crate::event::format::{override_data_type, LogSource}; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; @@ -156,39 +155,6 @@ pub async fn schema(stream_name: Path) -> Result) -> Result { - let stream_name = stream_name.into_inner(); - - let alerts = metadata::STREAM_INFO - .read() - .expect(metadata::LOCK_EXPECT) - .get(&stream_name) - .map(|metadata| { - serde_json::to_value(&metadata.alerts).expect("alerts can serialize to valid json") - }); - - let mut alerts = match alerts { - Some(alerts) => alerts, - None => { - let alerts = CONFIG - .storage() - .get_object_store() - .get_alerts(&stream_name) - .await?; - - if alerts.alerts.is_empty() { - return Err(StreamError::NoAlertsSet); - } - - serde_json::to_value(alerts).expect("alerts can serialize to valid json") - } - }; - - remove_id_from_alerts(&mut alerts); - - Ok((web::Json(alerts), StatusCode::OK)) -} - pub async fn put_stream( req: HttpRequest, stream_name: Path, @@ -201,71 +167,6 @@ pub async fn put_stream( Ok(("Log stream created", StatusCode::OK)) } -pub async fn put_alert( - stream_name: Path, - Json(mut json): Json, -) -> Result { - let stream_name = stream_name.into_inner(); - - remove_id_from_alerts(&mut json); - let alerts: Alerts = match serde_json::from_value(json) { - Ok(alerts) => alerts, - Err(err) => { - return Err(StreamError::BadAlertJson { - stream: stream_name, - err, - }) - } - }; - - validator::alert(&alerts)?; - - if !STREAM_INFO.stream_initialized(&stream_name)? { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if CONFIG.options.mode == Mode::Query { - match create_stream_and_schema_from_storage(&stream_name).await { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamError::StreamNotFound(stream_name.clone())), - } - } else { - return Err(StreamError::UninitializedLogstream); - } - } - - let schema = STREAM_INFO.schema(&stream_name)?; - for alert in &alerts.alerts { - for column in alert.message.extract_column_names() { - let is_valid = alert.message.valid(&schema, column); - if !is_valid { - return Err(StreamError::InvalidAlertMessage( - alert.name.to_owned(), - column.to_string(), - )); - } - if !alert.rule.valid_for_schema(&schema) { - return Err(StreamError::InvalidAlert(alert.name.to_owned())); - } - } - } - - CONFIG - .storage() - .get_object_store() - .put_alerts(&stream_name, &alerts) - .await?; - - metadata::STREAM_INFO - .set_alert(&stream_name, alerts) - .expect("alerts set on existing stream"); - - Ok(( - format!("set alert configuration for log stream {stream_name}"), - StatusCode::OK, - )) -} - pub async fn get_retention(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { @@ -464,17 +365,6 @@ pub async fn get_stats( Ok((web::Json(stats), StatusCode::OK)) } -fn remove_id_from_alerts(value: &mut Value) { - if let Some(Value::Array(alerts)) = value.get_mut("alerts") { - alerts - .iter_mut() - .map_while(|alert| alert.as_object_mut()) - .for_each(|map| { - map.remove("id"); - }); - } -} - #[allow(clippy::too_many_arguments)] pub async fn create_stream( stream_name: String, diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 471383600..30b0e14c5 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -28,6 +28,7 @@ use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT}; use self::{cluster::get_ingestor_info, query::Query}; pub mod about; +pub mod alerts; mod audit; pub mod cluster; pub mod correlation; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 7f58622a3..4c77e226c 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::alerts::ALERTS; use crate::correlation::CORRELATIONS; use crate::handlers::airplane; use crate::handlers::http::base_path; @@ -66,6 +67,7 @@ impl ParseableServer for QueryServer { .service(Self::get_user_role_webscope()) .service(Server::get_counts_webscope()) .service(Server::get_metrics_webscope()) + .service(Server::get_alerts_webscope()) .service(Self::get_cluster_web_scope()), ) .service(Server::get_generated()); @@ -99,8 +101,18 @@ impl ParseableServer for QueryServer { if let Err(e) = CORRELATIONS.load().await { error!("{e}"); } - FILTERS.load().await?; - DASHBOARDS.load().await?; + if let Err(err) = FILTERS.load().await { + error!("{err}") + }; + + if let Err(err) = DASHBOARDS.load().await { + error!("{err}") + }; + + if let Err(err) = ALERTS.load().await { + error!("{err}") + }; + // track all parquet files already in the data directory storage::retention::load_retention_from_global(); @@ -285,21 +297,6 @@ impl QueryServer { .authorize_for_stream(Action::GetStreamInfo), ), ) - .service( - web::resource("/alert") - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route( - web::put() - .to(logstream::put_alert) - .authorize_for_stream(Action::PutAlert), - ) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route( - web::get() - .to(logstream::get_alert) - .authorize_for_stream(Action::GetAlert), - ), - ) .service( // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index f9654d757..c7ee3963f 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -16,10 +16,12 @@ * */ +use crate::alerts::ALERTS; use crate::analytics; use crate::correlation::CORRELATIONS; use crate::handlers; use crate::handlers::http::about; +use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::query; @@ -81,6 +83,7 @@ impl ParseableServer for Server { .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Self::get_counts_webscope()) + .service(Self::get_alerts_webscope()) .service(Self::get_metrics_webscope()), ) .service(Self::get_ingest_otel_factory()) @@ -105,8 +108,17 @@ impl ParseableServer for Server { if let Err(e) = CORRELATIONS.load().await { error!("{e}"); } - FILTERS.load().await?; - DASHBOARDS.load().await?; + if let Err(err) = FILTERS.load().await { + error!("{err}") + }; + + if let Err(err) = DASHBOARDS.load().await { + error!("{err}") + }; + + if let Err(err) = ALERTS.load().await { + error!("{err}") + }; storage::retention::load_retention_from_global(); @@ -204,6 +216,32 @@ impl Server { ) } + pub fn get_alerts_webscope() -> Scope { + web::scope("/alerts") + .service( + web::resource("") + .route(web::get().to(alerts::list).authorize(Action::GetAlert)) + .route(web::post().to(alerts::post).authorize(Action::PutAlert)), + ) + .service( + web::resource("/{alert_id}") + .route(web::get().to(alerts::get).authorize(Action::GetAlert)) + .route(web::put().to(alerts::modify).authorize(Action::PutAlert)) + .route( + web::delete() + .to(alerts::delete) + .authorize(Action::DeleteAlert), + ), + ) + .service( + web::resource("/{alert_id}/update_state").route( + web::put() + .to(alerts::update_state) + .authorize(Action::PutAlert), + ), + ) + } + // get the dashboards web scope pub fn get_dashboards_webscope() -> Scope { web::scope("/dashboards") @@ -328,21 +366,6 @@ impl Server { .authorize_for_stream(Action::GetStreamInfo), ), ) - .service( - web::resource("/alert") - // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream - .route( - web::put() - .to(logstream::put_alert) - .authorize_for_stream(Action::PutAlert), - ) - // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream - .route( - web::get() - .to(logstream::get_alert) - .authorize_for_stream(Action::GetAlert), - ), - ) .service( // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( diff --git a/src/metadata.rs b/src/metadata.rs index 4aa15e66d..eb627d022 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -16,7 +16,6 @@ * */ -use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use chrono::{Local, NaiveDateTime}; use itertools::Itertools; @@ -27,8 +26,7 @@ use std::collections::HashMap; use std::num::NonZeroU32; use std::sync::{Arc, RwLock}; -use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; -use crate::alerts::Alerts; +use self::error::stream_info::{LoadError, MetadataError}; use crate::catalog::snapshot::ManifestItem; use crate::event::format::LogSource; use crate::metrics::{ @@ -66,7 +64,6 @@ pub enum SchemaVersion { pub struct LogStreamMetadata { pub schema_version: SchemaVersion, pub schema: HashMap>, - pub alerts: Alerts, pub retention: Option, pub created_at: String, pub first_event_at: Option, @@ -89,32 +86,11 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding // 4. When first event is sent to stream (update the schema) // 5. When set alert API is called (update the alert) impl StreamInfo { - pub async fn check_alerts( - &self, - stream_name: &str, - rb: &RecordBatch, - ) -> Result<(), CheckAlertError> { - let map = self.read().expect(LOCK_EXPECT); - let meta = map - .get(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; - - for alert in &meta.alerts.alerts { - alert.check_alert(stream_name, rb.clone()) - } - - Ok(()) - } - pub fn stream_exists(&self, stream_name: &str) -> bool { let map = self.read().expect(LOCK_EXPECT); map.contains_key(stream_name) } - pub fn stream_initialized(&self, stream_name: &str) -> Result { - Ok(!self.schema(stream_name)?.fields.is_empty()) - } - pub fn get_first_event(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) @@ -187,15 +163,6 @@ impl StreamInfo { Ok(Arc::new(schema)) } - pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); - map.get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| { - metadata.alerts = alerts; - }) - } - pub fn set_retention( &self, stream_name: &str, @@ -483,7 +450,6 @@ pub async fn load_stream_metadata_on_server_start( fetch_stats_from_storage(stream_name, stats).await; load_daily_metrics(&snapshot.manifest_list, stream_name); - let alerts = storage.get_alerts(stream_name).await?; let schema = update_schema_from_staging(stream_name, schema); let schema = HashMap::from_iter( schema @@ -495,7 +461,6 @@ pub async fn load_stream_metadata_on_server_start( let metadata = LogStreamMetadata { schema_version, schema, - alerts, retention, created_at, first_event_at, @@ -537,14 +502,6 @@ pub mod error { pub mod stream_info { use crate::storage::ObjectStorageError; - #[derive(Debug, thiserror::Error)] - pub enum CheckAlertError { - #[error("Serde Json Error: {0}")] - Serde(#[from] serde_json::Error), - #[error("Metadata Error: {0}")] - Metadata(#[from] MetadataError), - } - #[derive(Debug, thiserror::Error)] pub enum MetadataError { #[error("Metadata for stream {0} not found. Please create the stream and try again")] diff --git a/src/query/mod.rs b/src/query/mod.rs index 857cf18df..5006077a0 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -160,6 +160,16 @@ impl Query { Ok((results, fields)) } + pub async fn get_dataframe(&self, stream_name: String) -> Result { + let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; + + let df = QUERY_SESSION + .execute_logical_plan(self.final_logical_plan(&time_partition)) + .await?; + + Ok(df) + } + /// return logical plan with all time filters applied through fn final_logical_plan(&self, time_partition: &Option) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 @@ -386,7 +396,7 @@ pub struct CountsResponse { } #[derive(Debug, Default)] -pub(crate) struct TableScanVisitor { +pub struct TableScanVisitor { tables: Vec, } diff --git a/src/rbac/role.rs b/src/rbac/role.rs index bc1deb58e..00208631c 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -36,6 +36,7 @@ pub enum Action { DeleteHotTierEnabled, PutAlert, GetAlert, + DeleteAlert, PutUser, ListUser, DeleteUser, @@ -139,6 +140,9 @@ impl RoleBuilder { | Action::ListFilter | Action::CreateFilter | Action::DeleteFilter + | Action::PutAlert + | Action::GetAlert + | Action::DeleteAlert | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::ListStream @@ -147,8 +151,6 @@ impl RoleBuilder { | Action::GetStats | Action::GetRetention | Action::PutRetention - | Action::PutAlert - | Action::GetAlert | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), }; perms.push(perm); @@ -230,6 +232,7 @@ pub mod model { Action::DeleteHotTierEnabled, Action::PutAlert, Action::GetAlert, + Action::DeleteAlert, Action::QueryLLM, Action::CreateFilter, Action::ListFilter, @@ -258,6 +261,7 @@ pub mod model { Action::PutRetention, Action::PutAlert, Action::GetAlert, + Action::DeleteAlert, Action::GetRetention, Action::PutHotTierEnabled, Action::GetHotTierEnabled, @@ -308,6 +312,7 @@ pub mod model { Action::DeleteDashboard, Action::GetStreamInfo, Action::GetUserRoles, + Action::GetAlert, ], stream: None, tag: None, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 5d8041fc2..512a13053 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -39,8 +39,8 @@ use crate::{ }; use super::{ - LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, ALERTS_ROOT_DIRECTORY, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; #[derive(Debug, Clone, clap::Args)] @@ -296,7 +296,12 @@ impl ObjectStorage for LocalFS { } async fn list_streams(&self) -> Result, ObjectStorageError> { - let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR]; + let ignore_dir = &[ + "lost+found", + PARSEABLE_ROOT_DIRECTORY, + USERS_ROOT_DIR, + ALERTS_ROOT_DIRECTORY, + ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; let entries = entries @@ -316,7 +321,11 @@ impl ObjectStorage for LocalFS { } async fn list_old_streams(&self) -> Result, ObjectStorageError> { - let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY]; + let ignore_dir = &[ + "lost+found", + PARSEABLE_ROOT_DIRECTORY, + ALERTS_ROOT_DIRECTORY, + ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; let entries = entries diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3a490617a..7ca63d867 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -54,7 +54,7 @@ pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json"; pub const STREAM_ROOT_DIRECTORY: &str = ".stream"; pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; -pub const ALERT_FILE_NAME: &str = ".alert.json"; +pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; pub const MANIFEST_FILE: &str = "manifest.json"; /// local sync interval to move data.records to /tmp dir of that stream. diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b9dc59dcd..0a01cad2b 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,10 +21,11 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - Owner, StreamType, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, + Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::alerts::AlertConfig; use crate::event::format::LogSource; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; @@ -32,7 +33,6 @@ use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; use crate::{ - alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, @@ -49,7 +49,8 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::error; +use tracing::{error, warn}; +use ulid::Ulid; use std::collections::BTreeMap; use std::fmt::Debug; @@ -253,12 +254,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } - async fn put_alerts( + async fn put_alert( &self, - stream_name: &str, - alerts: &Alerts, + alert_id: Ulid, + alert: &AlertConfig, ) -> Result<(), ObjectStorageError> { - self.put_object(&alert_json_path(stream_name), to_bytes(alerts)) + self.put_object(&alert_json_path(alert_id), to_bytes(alert)) .await } @@ -335,21 +336,23 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(serde_json::from_slice(&schema_map)?) } - async fn get_alerts(&self, stream_name: &str) -> Result { - match self.get_object(&alert_json_path(stream_name)).await { - Ok(alerts) => { - if let Ok(alerts) = serde_json::from_slice(&alerts) { - Ok(alerts) - } else { - error!("Incompatible alerts found for stream - {stream_name}. Refer https://www.parseable.io/docs/alerts for correct alert config."); - Ok(Alerts::default()) - } - } - Err(e) => match e { - ObjectStorageError::NoSuchKey(_) => Ok(Alerts::default()), - e => Err(e), - }, - } + async fn get_alerts(&self) -> Result, ObjectStorageError> { + let alerts_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]); + let alerts = self + .get_objects( + Some(&alerts_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await? + .iter() + .filter_map(|bytes| { + serde_json::from_slice(bytes) + .inspect_err(|err| warn!("Expected compatible json, error = {err}")) + .ok() + }) + .collect(); + + Ok(alerts) } async fn upsert_stream_metadata( @@ -814,8 +817,8 @@ pub fn parseable_json_path() -> RelativePathBuf { /// TODO: Needs to be updated for distributed mode #[inline(always)] -fn alert_json_path(stream_name: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, ALERT_FILE_NAME]) +pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf { + RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("{alert_id}.json")]) } #[inline(always)] diff --git a/src/sync.rs b/src/sync.rs index 2a06d88aa..d843e3a3a 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -23,6 +23,7 @@ use tokio::task; use tokio::time::{interval, sleep, Duration}; use tracing::{error, info, warn}; +use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; @@ -134,3 +135,64 @@ pub async fn run_local_sync() -> ( (handle, outbox_rx, inbox_tx) } + +pub async fn schedule_alert_task( + eval_frequency: u32, + alert: AlertConfig, +) -> Result< + ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, + ), + AlertError, +> { + let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); + let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); + + let handle = tokio::task::spawn(async move { + info!("new alert task started for {alert:?}"); + + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut scheduler = AsyncScheduler::new(); + scheduler.every((eval_frequency).minutes()).run(move || { + let alert_val = alert.clone(); + async move { + match alerts_utils::evaluate_alert(&alert_val).await { + Ok(_) => {} + Err(err) => error!("Error while evaluation- {err}"), + } + } + }); + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let mut check_interval = interval(Duration::from_secs(1)); + + loop { + // Run any pending scheduled tasks + check_interval.tick().await; + scheduler.run_pending().await; + + // Check inbox + match inbox_rx.try_recv() { + Ok(_) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + warn!("Inbox channel closed unexpectedly"); + break; + } + } + } + })); + + match result { + Ok(future) => { + future.await; + } + Err(panic_error) => { + error!("Panic in scheduled alert task: {:?}", panic_error); + let _ = outbox_tx.send(()); + } + } + }); + Ok((handle, outbox_rx, inbox_tx)) +} diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 58809618c..f5a897764 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -178,24 +178,6 @@ pub fn validate_time_partition( } } -// Flattens starting from only object types at TOP, e.g. with the parent_key `root` and separator `_` -// `{ "a": { "b": 1, c: { "d": 2 } } }` becomes `{"root_a_b":1,"root_a_c_d":2}` -pub fn flatten_with_parent_prefix( - nested_value: &mut Value, - prefix: &str, - separator: &str, -) -> Result<(), JsonFlattenError> { - let Value::Object(nested_obj) = nested_value else { - return Err(JsonFlattenError::NonObjectInArray); - }; - - let mut map = Map::new(); - flatten_object(&mut map, Some(prefix), nested_obj, separator)?; - *nested_obj = map; - - Ok(()) -} - // Flattens a nested JSON Object/Map into another target Map fn flatten_object( output_map: &mut Map, diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 0f5c05812..efa9cb2e2 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -89,26 +89,6 @@ pub fn convert_array_to_object( Ok(value_arr) } -pub fn convert_to_string(value: &Value) -> Value { - match value { - Value::Null => Value::String("null".to_owned()), - Value::Bool(b) => Value::String(b.to_string()), - Value::Number(n) => Value::String(n.to_string()), - Value::String(s) => Value::String(s.to_owned()), - Value::Array(v) => { - let new_vec = v.iter().map(convert_to_string).collect(); - Value::Array(new_vec) - } - Value::Object(map) => { - let new_map = map - .iter() - .map(|(k, v)| (k.clone(), convert_to_string(v))) - .collect(); - Value::Object(new_map) - } - } -} - struct TrueFromStr; impl Visitor<'_> for TrueFromStr { diff --git a/src/validator.rs b/src/validator.rs index 3f1d07a08..934b620bb 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -18,10 +18,7 @@ use error::HotTierValidationError; -use self::error::{AlertValidationError, StreamNameValidationError, UsernameValidationError}; -use crate::alerts::rule::base::{NumericRule, StringRule}; -use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; -use crate::alerts::{Alerts, Rule}; +use self::error::{StreamNameValidationError, UsernameValidationError}; use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; use crate::storage::StreamType; use crate::utils::human_size::bytes_to_human_size; @@ -33,50 +30,6 @@ const DENIED_NAMES: &[&str] = &[ const ALLOWED_SPECIAL_CHARS: &[char] = &['-', '_']; -pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { - let alert_name: Vec<&str> = alerts.alerts.iter().map(|a| a.name.as_str()).collect(); - let mut alert_name_dedup = alert_name.clone(); - alert_name_dedup.sort(); - alert_name_dedup.dedup(); - - if alert_name.len() != alert_name_dedup.len() { - return Err(AlertValidationError::ExistingName); - } - for alert in &alerts.alerts { - if alert.name.is_empty() { - return Err(AlertValidationError::EmptyName); - } - - if alert.message.message.is_empty() { - return Err(AlertValidationError::EmptyMessage); - } - if alert.targets.is_empty() { - return Err(AlertValidationError::NoTarget); - } - - if let Rule::Column(ref column_rule) = alert.rule { - match column_rule { - ColumnRule::ConsecutiveNumeric(ConsecutiveNumericRule { - base_rule: NumericRule { ref column, .. }, - ref state, - }) - | ColumnRule::ConsecutiveString(ConsecutiveStringRule { - base_rule: StringRule { ref column, .. }, - ref state, - }) => { - if column.is_empty() { - return Err(AlertValidationError::EmptyRuleField); - } - if state.repeats == 0 { - return Err(AlertValidationError::InvalidRuleRepeat); - } - } - } - } - } - Ok(()) -} - pub fn stream_name( stream_name: &str, stream_type: StreamType,