Skip to content

Commit 55f22ba

Browse files
fix: update user auth for list filters (#1295)
current: server checks if user is authorized to a dataset based on query string this logic does not work for saved searches because there is no query string as searches have key:value pair syntax update: server fetches the filter_type if filter_type is sql or filter, server checks if user is authorized to a dataset based on query string if filter_type is search, server checks if user is authorized to a dataset based on the dataset_name
1 parent cec4ab8 commit 55f22ba

File tree

10 files changed

+84
-75
lines changed

10 files changed

+84
-75
lines changed

src/alerts/alerts_utils.rs

+1-51
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
use arrow_array::{Float64Array, Int64Array, RecordBatch};
2020
use datafusion::{
21-
common::tree_node::TreeNode,
2221
functions_aggregate::{
2322
count::count,
2423
expr_fn::avg,
@@ -31,63 +30,14 @@ use datafusion::{
3130
use tracing::trace;
3231

3332
use crate::{
34-
alerts::LogicalOperator,
35-
parseable::PARSEABLE,
36-
query::{TableScanVisitor, QUERY_SESSION},
37-
rbac::{
38-
map::SessionKey,
39-
role::{Action, Permission},
40-
Users,
41-
},
42-
utils::time::TimeRange,
33+
alerts::LogicalOperator, parseable::PARSEABLE, query::QUERY_SESSION, utils::time::TimeRange,
4334
};
4435

4536
use super::{
4637
AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, AlertError,
4738
AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, ALERTS,
4839
};
4940

50-
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
51-
let session_state = QUERY_SESSION.state();
52-
let raw_logical_plan = session_state.create_logical_plan(query).await?;
53-
54-
let mut visitor = TableScanVisitor::default();
55-
let _ = raw_logical_plan.visit(&mut visitor);
56-
Ok(visitor)
57-
}
58-
59-
pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Result<(), AlertError> {
60-
let tables = get_tables_from_query(query).await?;
61-
let permissions = Users.get_permissions(session_key);
62-
63-
for table_name in tables.into_inner().iter() {
64-
let mut authorized = false;
65-
66-
// in permission check if user can run query on the stream.
67-
// also while iterating add any filter tags for this stream
68-
for permission in permissions.iter() {
69-
match permission {
70-
Permission::Stream(Action::All, _) => {
71-
authorized = true;
72-
break;
73-
}
74-
Permission::StreamWithTag(Action::Query, ref stream, _)
75-
if stream == table_name || stream == "*" =>
76-
{
77-
authorized = true;
78-
}
79-
_ => (),
80-
}
81-
}
82-
83-
if !authorized {
84-
return Err(AlertError::Unauthorized);
85-
}
86-
}
87-
88-
Ok(())
89-
}
90-
9141
/// accept the alert
9242
///
9343
/// alert contains aggregate_config

src/alerts/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818

1919
use actix_web::http::header::ContentType;
20-
use alerts_utils::user_auth_for_query;
2120
use async_trait::async_trait;
2221
use chrono::Utc;
2322
use derive_more::derive::FromStr;
@@ -43,6 +42,7 @@ use crate::rbac::map::SessionKey;
4342
use crate::storage;
4443
use crate::storage::ObjectStorageError;
4544
use crate::sync::alert_runtime;
45+
use crate::utils::user_auth_for_query;
4646

4747
use self::target::Target;
4848

src/correlation.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::{
4040
rbac::{map::SessionKey, Users},
4141
storage::ObjectStorageError,
4242
users::filters::FilterQuery,
43-
utils::{get_hash, user_auth_for_query},
43+
utils::{get_hash, user_auth_for_datasets},
4444
};
4545

4646
pub static CORRELATIONS: Lazy<Correlations> = Lazy::new(Correlations::default);
@@ -87,7 +87,7 @@ impl Correlations {
8787
.iter()
8888
.map(|t| t.table_name.clone())
8989
.collect_vec();
90-
if user_auth_for_query(&permissions, tables).is_ok() {
90+
if user_auth_for_datasets(&permissions, tables).is_ok() {
9191
user_correlations.push(correlation.clone());
9292
}
9393
}
@@ -281,7 +281,7 @@ impl CorrelationConfig {
281281
.map(|t| t.table_name.clone())
282282
.collect_vec();
283283

284-
user_auth_for_query(&permissions, tables)?;
284+
user_auth_for_datasets(&permissions, tables)?;
285285

286286
// to validate table config, we need to check whether the mentioned fields
287287
// are present in the table or not

src/handlers/airplane.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::utils::arrow::flight::{
4444
send_to_ingester,
4545
};
4646
use crate::utils::time::TimeRange;
47-
use crate::utils::user_auth_for_query;
47+
use crate::utils::user_auth_for_datasets;
4848
use arrow_flight::{
4949
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
5050
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
@@ -211,7 +211,7 @@ impl FlightService for AirServiceImpl {
211211

212212
let permissions = Users.get_permissions(&key);
213213

214-
user_auth_for_query(&permissions, &streams).map_err(|_| {
214+
user_auth_for_datasets(&permissions, &streams).map_err(|_| {
215215
Status::permission_denied("User Does not have permission to access this")
216216
})?;
217217
let time = Instant::now();

src/handlers/http/alerts.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
parseable::PARSEABLE,
2323
storage::object_storage::alert_json_path,
2424
// sync::schedule_alert_task,
25-
utils::actix::extract_session_key_from_req,
25+
utils::{actix::extract_session_key_from_req, user_auth_for_query},
2626
};
2727
use actix_web::{
2828
web::{self, Json, Path},
@@ -31,9 +31,7 @@ use actix_web::{
3131
use bytes::Bytes;
3232
use ulid::Ulid;
3333

34-
use crate::alerts::{
35-
alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS,
36-
};
34+
use crate::alerts::{AlertConfig, AlertError, AlertRequest, AlertState, ALERTS};
3735

3836
// GET /alerts
3937
/// User needs at least a read access to the stream(s) that is being referenced in an alert

src/handlers/http/correlation.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use itertools::Itertools;
2323

2424
use crate::rbac::Users;
2525
use crate::utils::actix::extract_session_key_from_req;
26-
use crate::utils::{get_hash, get_user_from_request, user_auth_for_query};
26+
use crate::utils::{get_hash, get_user_from_request, user_auth_for_datasets};
2727

2828
use crate::correlation::{CorrelationConfig, CorrelationError, CORRELATIONS};
2929

@@ -54,7 +54,7 @@ pub async fn get(
5454
.map(|t| t.table_name.clone())
5555
.collect_vec();
5656

57-
user_auth_for_query(&permissions, tables)?;
57+
user_auth_for_datasets(&permissions, tables)?;
5858

5959
Ok(web::Json(correlation))
6060
}

src/handlers/http/query.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::storage::object_storage::commit_schema_to_storage;
4949
use crate::storage::ObjectStorageError;
5050
use crate::utils::actix::extract_session_key_from_req;
5151
use crate::utils::time::{TimeParseError, TimeRange};
52-
use crate::utils::user_auth_for_query;
52+
use crate::utils::user_auth_for_datasets;
5353

5454
/// Query Request through http endpoint.
5555
#[derive(Debug, Deserialize, Serialize, Clone)]
@@ -99,7 +99,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
9999
.first_table_name()
100100
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
101101

102-
user_auth_for_query(&permissions, &tables)?;
102+
user_auth_for_datasets(&permissions, &tables)?;
103103

104104
let time = Instant::now();
105105
// Intercept `count(*)`` queries and use the counts API
@@ -162,7 +162,7 @@ pub async fn get_counts(
162162
let permissions = Users.get_permissions(&creds);
163163

164164
// does user have access to table?
165-
user_auth_for_query(&permissions, &[counts_request.stream.clone()])?;
165+
user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?;
166166

167167
let records = counts_request.get_bin_density().await?;
168168

src/users/dashboards.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ use serde_json::Value;
2222
use tokio::sync::RwLock;
2323

2424
use crate::{
25-
alerts::alerts_utils::user_auth_for_query, migration::to_bytes, parseable::PARSEABLE,
26-
rbac::map::SessionKey, storage::object_storage::dashboard_path, utils::get_hash,
25+
migration::to_bytes,
26+
parseable::PARSEABLE,
27+
rbac::map::SessionKey,
28+
storage::object_storage::dashboard_path,
29+
utils::{get_hash, user_auth_for_query},
2730
};
2831

2932
use super::TimeFilter;

src/users/filters.rs

+39-6
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ use tokio::sync::RwLock;
2323

2424
use super::TimeFilter;
2525
use crate::{
26-
alerts::alerts_utils::user_auth_for_query, migration::to_bytes, parseable::PARSEABLE,
27-
rbac::map::SessionKey, storage::object_storage::filter_path, utils::get_hash,
26+
migration::to_bytes,
27+
parseable::PARSEABLE,
28+
rbac::{map::SessionKey, Users},
29+
storage::object_storage::filter_path,
30+
utils::{get_hash, user_auth_for_datasets, user_auth_for_query},
2831
};
2932

3033
pub static FILTERS: Lazy<Filters> = Lazy::new(Filters::default);
@@ -42,11 +45,29 @@ pub struct Filter {
4245

4346
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
4447
pub struct FilterQuery {
45-
pub filter_type: String,
48+
pub filter_type: FilterType,
4649
pub filter_query: Option<String>,
4750
pub filter_builder: Option<FilterBuilder>,
4851
}
4952

53+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
54+
#[serde(rename_all = "lowercase")]
55+
pub enum FilterType {
56+
Filter,
57+
SQL,
58+
Search,
59+
}
60+
61+
impl FilterType {
62+
pub fn to_str(&self) -> &str {
63+
match self {
64+
FilterType::Filter => "filter",
65+
FilterType::SQL => "sql",
66+
FilterType::Search => "search",
67+
}
68+
}
69+
}
70+
5071
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
5172
pub struct FilterBuilder {
5273
pub id: String,
@@ -163,9 +184,21 @@ impl Filters {
163184
} else {
164185
continue;
165186
};
166-
167-
if (user_auth_for_query(key, query).await).is_ok() {
168-
filters.push(f.clone())
187+
let filter_type = &f.query.filter_type;
188+
189+
// if filter type is one of SQL or filter
190+
// then check if the user has access to the dataset based on the query string
191+
// if filter type is search then check if the user has access to the dataset based on the dataset name
192+
if *filter_type == FilterType::SQL || *filter_type == FilterType::Filter {
193+
if (user_auth_for_query(key, query).await).is_ok() {
194+
filters.push(f.clone())
195+
}
196+
} else if *filter_type == FilterType::Search {
197+
let dataset_name = &f.stream_name;
198+
let permissions = Users.get_permissions(key);
199+
if user_auth_for_datasets(&permissions, &[dataset_name.to_string()]).is_ok() {
200+
filters.push(f.clone())
201+
}
169202
}
170203
}
171204
filters

src/utils/mod.rs

+26-1
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ pub mod uid;
2626
pub mod update;
2727

2828
use crate::handlers::http::rbac::RBACError;
29+
use crate::query::{TableScanVisitor, QUERY_SESSION};
30+
use crate::rbac::map::SessionKey;
2931
use crate::rbac::role::{Action, Permission};
3032
use crate::rbac::Users;
3133
use actix::extract_session_key_from_req;
3234
use actix_web::HttpRequest;
3335
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
36+
use datafusion::common::tree_node::TreeNode;
3437
use regex::Regex;
3538
use sha2::{Digest, Sha256};
3639
use tracing::debug;
@@ -82,7 +85,29 @@ pub fn get_hash(key: &str) -> String {
8285
result
8386
}
8487

85-
pub fn user_auth_for_query(
88+
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, actix_web::error::Error> {
89+
let session_state = QUERY_SESSION.state();
90+
let raw_logical_plan = session_state
91+
.create_logical_plan(query)
92+
.await
93+
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("Query error: {}", e)))?;
94+
95+
let mut visitor = TableScanVisitor::default();
96+
let _ = raw_logical_plan.visit(&mut visitor);
97+
Ok(visitor)
98+
}
99+
100+
pub async fn user_auth_for_query(
101+
session_key: &SessionKey,
102+
query: &str,
103+
) -> Result<(), actix_web::error::Error> {
104+
let tables = get_tables_from_query(query).await?;
105+
let permissions = Users.get_permissions(session_key);
106+
let tables = tables.into_inner();
107+
user_auth_for_datasets(&permissions, &tables)
108+
}
109+
110+
pub fn user_auth_for_datasets(
86111
permissions: &[Permission],
87112
tables: &[String],
88113
) -> Result<(), actix_web::error::Error> {

0 commit comments

Comments
 (0)