Skip to content

Commit 3f9a2bf

Browse files
committed
Helper functions for query
1 parent 232e031 commit 3f9a2bf

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed

src/handlers/http/query.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,51 @@ pub struct Query {
7171
pub filter_tags: Option<Vec<String>>,
7272
}
7373

74+
/// A function to execute the query and fetch QueryResponse
75+
/// This won't look in the cache
76+
/// TODO: Improve this function and make this a part of the query API
77+
pub async fn get_records_and_fields(
78+
query_request: &Query,
79+
req: &HttpRequest,
80+
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
81+
let session_state = QUERY_SESSION.state();
82+
83+
// get the logical plan and extract the table name
84+
let raw_logical_plan = session_state
85+
.create_logical_plan(&query_request.query)
86+
.await?;
87+
88+
let time_range =
89+
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
90+
// create a visitor to extract the table name
91+
let mut visitor = TableScanVisitor::default();
92+
let _ = raw_logical_plan.visit(&mut visitor);
93+
94+
let tables = visitor.into_inner();
95+
update_schema_when_distributed(&tables).await?;
96+
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
97+
98+
let creds = extract_session_key_from_req(req)?;
99+
let permissions = Users.get_permissions(&creds);
100+
101+
let table_name = query
102+
.first_table_name()
103+
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
104+
105+
user_auth_for_datasets(&permissions, &tables)?;
106+
107+
let (records, fields) = execute(query, &table_name, false).await?;
108+
109+
let records = match records {
110+
Either::Left(vec_rb) => vec_rb,
111+
Either::Right(_) => {
112+
return Err(QueryError::CustomError("Reject streaming response".into()))
113+
}
114+
};
115+
116+
Ok((Some(records), Some(fields)))
117+
}
118+
74119
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
75120
let session_state = QUERY_SESSION.state();
76121
let raw_logical_plan = match session_state

src/utils/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,22 @@ pub fn user_auth_for_datasets(
131131

132132
Ok(())
133133
}
134+
135+
/// A function to extract table names from a SQL string
136+
pub async fn extract_tables(sql: &str) -> Option<Vec<String>> {
137+
let session_state = QUERY_SESSION.state();
138+
139+
// get the logical plan and extract the table name
140+
let raw_logical_plan = match session_state.create_logical_plan(sql).await {
141+
Ok(plan) => plan,
142+
Err(_) => return None,
143+
};
144+
145+
// create a visitor to extract the table name
146+
let mut visitor = TableScanVisitor::default();
147+
let _ = raw_logical_plan.visit(&mut visitor);
148+
149+
let tables = visitor.into_inner();
150+
151+
Some(tables)
152+
}

0 commit comments

Comments
 (0)