Skip to content
This repository was archived by the owner on Aug 4, 2025. It is now read-only.

chore: add more fields to responses and JsonResult #14

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 58 additions & 9 deletions snowflake-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use reqwest_middleware::ClientWithMiddleware;
use serde_json::Value;
use thiserror::Error;

use responses::{ExecResponse, ExecRestResponse, ProcessedRestResponse};
use responses::{ExecResponse, ExecRestResponse, ProcessedRestResponse, QueryContext};
use session::{AuthError, Session};

use crate::connection::QueryType;
Expand Down Expand Up @@ -106,6 +106,9 @@ pub enum SnowflakeApiError {
#[derive(Debug)]
pub struct EmptyJsonResult {
pub schema: Option<Vec<FieldSchema>>,
pub query_id: String,
pub send_result_time: usize,
pub query_context: QueryContext,
}

/// Even if Arrow is specified as a return type non-select queries
Expand All @@ -116,6 +119,9 @@ pub struct JsonResult {
pub value: serde_json::Value,
/// Field ordering matches the array ordering
pub schema: Vec<FieldSchema>,
pub query_id: String,
pub send_result_time: usize,
pub query_context: QueryContext,
}

impl Display for JsonResult {
Expand All @@ -124,6 +130,14 @@ impl Display for JsonResult {
}
}

#[derive(Debug)]
pub struct BytesResult {
pub chunks: Vec<Bytes>,
pub query_id: String,
pub send_result_time: usize,
pub query_context: QueryContext,
}

/// Based on the [`ExecResponseRowType`]
#[derive(Debug)]
pub struct FieldSchema {
Expand All @@ -147,12 +161,20 @@ impl From<ExecResponseRowType> for FieldSchema {
}
}

#[derive(Debug)]
pub struct ArrowResult {
pub batches: Vec<RecordBatch>,
pub query_id: String,
pub send_result_time: usize,
pub query_context: QueryContext,
}

/// Container for query result.
/// Arrow is returned by-default for all SELECT statements,
/// unless there is session configuration issue or it's a different statement type.
#[derive(Debug)]
pub enum QueryResult {
Arrow(Vec<RecordBatch>),
Arrow(ArrowResult),
Json(JsonResult),
Empty(EmptyJsonResult),
}
Expand All @@ -162,7 +184,7 @@ pub enum QueryResult {
pub enum RawQueryResult {
/// Arrow IPC chunks
/// see: <https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc>
Bytes(Vec<Bytes>),
Bytes(BytesResult),
/// Json payload is deserialized,
/// as it's already a part of REST response
Json(JsonResult),
Expand All @@ -172,9 +194,15 @@ pub enum RawQueryResult {
impl RawQueryResult {
pub fn deserialize_arrow(self) -> Result<QueryResult, ArrowError> {
match self {
RawQueryResult::Bytes(bytes) => {
Self::flat_bytes_to_batches(bytes).map(QueryResult::Arrow)
}
RawQueryResult::Bytes(bytes_result) => Self::flat_bytes_to_batches(bytes_result.chunks)
.map(|batches| {
QueryResult::Arrow(ArrowResult {
batches,
query_id: bytes_result.query_id,
send_result_time: bytes_result.send_result_time,
query_context: bytes_result.query_context,
})
}),
RawQueryResult::Json(j) => Ok(QueryResult::Json(j)),
RawQueryResult::Empty(e) => Ok(QueryResult::Empty(e)),
}
Expand Down Expand Up @@ -423,7 +451,15 @@ impl SnowflakeApi {
match resp {
ExecResponse::Query(_) => Err(SnowflakeApiError::UnexpectedResponse),
ExecResponse::PutGet(pg) => {
let res = into_resp_type!(&pg, RawQueryResult::Empty(EmptyJsonResult { schema: None }));
let res = into_resp_type!(
&pg,
RawQueryResult::Empty(EmptyJsonResult {
schema: None,
query_id: pg.data.query_id.clone(),
send_result_time: pg.data.send_result_time,
query_context: pg.data.query_context.clone()
})
);
put::put(pg).await?;
Ok(res)
}
Expand Down Expand Up @@ -492,7 +528,12 @@ impl SnowflakeApi {
} else {
None
};
RawQueryResult::Empty(EmptyJsonResult { schema })
RawQueryResult::Empty(EmptyJsonResult {
schema,
query_id: sync_data.query_id,
send_result_time: sync_data.send_result_time,
query_context: sync_data.query_context,
})
} else if let Some(value) = sync_data.rowset {
log::debug!("Got JSON response");
let mut values: Vec<Value> = serde_json::from_value(value).unwrap();
Expand Down Expand Up @@ -521,6 +562,9 @@ impl SnowflakeApi {
let value = serde_json::to_value(values).unwrap();
RawQueryResult::Json(JsonResult {
value,
query_id: sync_data.query_id,
send_result_time: sync_data.send_result_time,
query_context: sync_data.query_context,
schema: sync_data
.rowtype
.unwrap()
Expand All @@ -544,7 +588,12 @@ impl SnowflakeApi {
chunks.push(bytes);
}

RawQueryResult::Bytes(chunks)
RawQueryResult::Bytes(BytesResult {
chunks,
query_id: sync_data.query_id,
send_result_time: sync_data.send_result_time,
query_context: sync_data.query_context,
})
} else {
return Err(SnowflakeApiError::BrokenResponse);
};
Expand Down
22 changes: 21 additions & 1 deletion snowflake-api/src/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,21 @@ pub struct AsyncQueryExecResponseData {
pub query_aborts_after_secs: i64,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct QueryContextEntry {
pub id: isize,
pub timestamp: usize,
// pub priority: usize,
// pub context: String
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct QueryContext {
pub entries: Vec<QueryContextEntry>,
}

#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct SyncQueryExecResponseData {
Expand Down Expand Up @@ -212,7 +227,9 @@ pub struct SyncQueryExecResponseData {
// multi-statement response, comma-separated
pub result_ids: Option<String>,
// `progressDesc`, and `queryAbortAfterSecs` are not used but exist in .NET
// `sendResultTime`, `queryResultFormat`, `queryContext` also exist
// `queryResultFormat` also exist
pub send_result_time: usize,
pub query_context: QueryContext,
}

#[derive(Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -283,6 +300,9 @@ pub struct PutGetResponseData {
#[serde(default)]
pub parameters: Vec<NameValueParameter>,
pub statement_type_id: Option<i64>,
pub query_id: String,
pub send_result_time: usize,
pub query_context: QueryContext,
}

#[derive(Deserialize, Debug)]
Expand Down