Skip to content

Commit db27e6c

Browse files
authored
Merge pull request #65 from Embucket/snowflake
Convert timestamp
2 parents abf145d + 3dab38d commit db27e6c

File tree

5 files changed

+163
-42
lines changed

5 files changed

+163
-42
lines changed

crates/control_plane/src/service.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use crate::repository::{StorageProfileRepository, WarehouseRepository};
55
use crate::sql::functions::common::convert_record_batches;
66
use crate::sql::sql::SqlExecutor;
77
use arrow::record_batch::RecordBatch;
8+
use arrow_json::writer::JsonArray;
9+
use arrow_json::WriterBuilder;
810
use async_trait::async_trait;
911
use base64::Engine;
1012
use bytes::Bytes;
@@ -240,10 +242,11 @@ impl ControlService for ControlServiceImpl {
240242
let (records, _) = self
241243
.query(warehouse_id, database_name, _table_name, query)
242244
.await?;
243-
// println!("{records:?}");
244245

245246
let buf = Vec::new();
246-
let mut writer = arrow_json::ArrayWriter::new(buf);
247+
let write_builder = WriterBuilder::new().with_explicit_nulls(true);
248+
let mut writer = write_builder.build::<_, JsonArray>(buf);
249+
247250
let record_refs: Vec<&RecordBatch> = records.iter().collect();
248251
writer.write_batches(&record_refs).unwrap();
249252
writer.finish().unwrap();
@@ -301,7 +304,9 @@ impl ControlService for ControlServiceImpl {
301304

302305
// We use json format since there is a bug between arrow and nanoarrow
303306
let buf = Vec::new();
304-
let mut writer = arrow_json::ArrayWriter::new(buf);
307+
let write_builder = WriterBuilder::new().with_explicit_nulls(true);
308+
let mut writer = write_builder.build::<_, JsonArray>(buf);
309+
305310
let record_refs: Vec<&RecordBatch> = records.iter().collect();
306311
writer.write_batches(&record_refs).unwrap();
307312
writer.finish().unwrap();

crates/control_plane/src/sql/functions/common.rs

Lines changed: 97 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::models::ColumnInfo;
2-
use arrow::array::{Array, UnionArray};
3-
use arrow::datatypes::{Field, Schema};
2+
use arrow::array::{
3+
Array, Int32Array, Int64Array, StructArray, TimestampMicrosecondArray,
4+
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UnionArray,
5+
};
6+
use arrow::datatypes::{Field, Schema, TimeUnit};
47
use arrow::record_batch::RecordBatch;
58
use datafusion::arrow::array::ArrayRef;
69
use datafusion::arrow::datatypes::DataType;
@@ -30,26 +33,108 @@ pub fn convert_record_batches(
3033
for (i, column) in batch.columns().iter().enumerate() {
3134
let metadata = column_infos[i].to_metadata();
3235
let field = batch.schema().field(i).clone();
33-
let converted_column = if let Some(union_array) =
34-
column.as_any().downcast_ref::<UnionArray>()
35-
{
36-
if let Some((data_type, array)) = first_non_empty_type(union_array) {
37-
fields.push(Field::new(field.name(), data_type, field.is_nullable()).with_metadata(metadata));
38-
array
39-
} else {
36+
let converted_column = match field.data_type() {
37+
DataType::Union(..) => {
38+
if let Some(union_array) = column.as_any().downcast_ref::<UnionArray>() {
39+
if let Some((data_type, array)) = first_non_empty_type(union_array) {
40+
fields.push(
41+
Field::new(field.name(), data_type, field.is_nullable())
42+
.with_metadata(metadata),
43+
);
44+
array
45+
} else {
46+
fields.push(field.clone().with_metadata(metadata));
47+
Arc::clone(column)
48+
}
49+
} else {
50+
fields.push(field.clone().with_metadata(metadata));
51+
Arc::clone(column)
52+
}
53+
}
54+
DataType::Timestamp(unit, _) => {
55+
let converted_column = convert_timestamp_to_struct(column, unit);
56+
fields.push(
57+
Field::new(
58+
field.name(),
59+
converted_column.data_type().clone(),
60+
field.is_nullable(),
61+
)
62+
.with_metadata(metadata),
63+
);
64+
Arc::clone(&converted_column)
65+
}
66+
_ => {
4067
fields.push(field.clone().with_metadata(metadata));
4168
Arc::clone(column)
4269
}
43-
} else {
44-
fields.push(field.clone().with_metadata(metadata));
45-
Arc::clone(column)
4670
};
4771
columns.push(converted_column);
4872
}
4973
let new_schema = Arc::new(Schema::new(fields));
74+
println!("new schema: {:?}", new_schema);
75+
println!("columns: {:?}", columns);
5076
let converted_batch = RecordBatch::try_new(new_schema, columns)?;
5177
converted_batches.push(converted_batch);
5278
}
5379

5480
Ok((converted_batches.clone(), column_infos))
5581
}
82+
83+
fn convert_timestamp_to_struct(column: &ArrayRef, unit: &TimeUnit) -> ArrayRef {
84+
let (epoch, fraction) = match unit {
85+
TimeUnit::Second => {
86+
let array = column
87+
.as_any()
88+
.downcast_ref::<TimestampSecondArray>()
89+
.unwrap();
90+
let epoch: Int64Array = array.clone().unary(|x| x);
91+
let fraction: Int32Array = Int32Array::from(vec![0; column.len()]);
92+
(epoch, fraction)
93+
}
94+
TimeUnit::Millisecond => {
95+
let array = column
96+
.as_any()
97+
.downcast_ref::<TimestampMillisecondArray>()
98+
.unwrap();
99+
let epoch: Int64Array = array.clone().unary(|x| x / 1_000);
100+
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000 * 1_000_000) as i32);
101+
(epoch, fraction)
102+
}
103+
TimeUnit::Microsecond => {
104+
let array = column
105+
.as_any()
106+
.downcast_ref::<TimestampMicrosecondArray>()
107+
.unwrap();
108+
let epoch: Int64Array = array.clone().unary(|x| x / 1_000_000);
109+
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000_000 * 1_000) as i32);
110+
(epoch, fraction)
111+
}
112+
TimeUnit::Nanosecond => {
113+
let array = column
114+
.as_any()
115+
.downcast_ref::<TimestampNanosecondArray>()
116+
.unwrap();
117+
let epoch: Int64Array = array.clone().unary(|x| x / 1_000_000_000);
118+
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000_000_000) as i32);
119+
(epoch, fraction)
120+
}
121+
};
122+
123+
// let timezone = Int32Array::from(vec![1440; column.len()]); // Assuming UTC timezone
124+
let struct_array = StructArray::from(vec![
125+
(
126+
Arc::new(Field::new("epoch", DataType::Int64, false)),
127+
Arc::new(epoch) as ArrayRef,
128+
),
129+
(
130+
Arc::new(Field::new("fraction", DataType::Int32, false)),
131+
Arc::new(fraction) as ArrayRef,
132+
),
133+
// (
134+
// Arc::new(Field::new("timezone", DataType::Int32, false)),
135+
// Arc::new(timezone) as ArrayRef,
136+
// ),
137+
]);
138+
139+
Arc::new(struct_array) as ArrayRef
140+
}

crates/control_plane/src/sql/planner.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,7 @@ where
7070
match self.handle_custom_statement(statement.clone()) {
7171
Ok(plan) => return Ok(plan),
7272
Err(e) => {
73-
eprintln!(
74-
"Custom statement parsing skipped: {} with err {}",
75-
statement.to_string(),
76-
e
77-
);
73+
eprintln!("Custom statement parsing skipped: {statement} ");
7874
}
7975
}
8076

@@ -105,7 +101,8 @@ where
105101
all_constraints.extend(inline_constraints);
106102
// Build column default values
107103
let column_defaults = self.build_column_defaults(&columns, planner_context)?;
108-
104+
println!("column_defaults: {:?}", column_defaults);
105+
println!("statement 11: {:?}", statement);
109106
let has_columns = !columns.is_empty();
110107
let schema = self.build_schema(columns.clone())?.to_dfschema_ref()?;
111108
if has_columns {
@@ -240,7 +237,7 @@ where
240237
SQLDataType::Char(_)
241238
| SQLDataType::Text
242239
| SQLDataType::String(_) => Ok(DataType::Utf8),
243-
SQLDataType::Timestamp(None, tz_info) => {
240+
SQLDataType::Timestamp(precision, tz_info) => {
244241
let tz = if matches!(tz_info, TimezoneInfo::Tz)
245242
|| matches!(tz_info, TimezoneInfo::WithTimeZone)
246243
{
@@ -252,7 +249,14 @@ where
252249
// Timestamp Without Time zone
253250
None
254251
};
255-
Ok(DataType::Timestamp(TimeUnit::Nanosecond, tz.map(Into::into)))
252+
let precision = match precision {
253+
Some(0) => TimeUnit::Second,
254+
Some(3) => TimeUnit::Millisecond,
255+
Some(6) => TimeUnit::Microsecond,
256+
None | Some(9) => TimeUnit::Nanosecond,
257+
_ => unreachable!(),
258+
};
259+
Ok(DataType::Timestamp(precision, tz.map(Into::into)))
256260
}
257261
SQLDataType::Date => Ok(DataType::Date32),
258262
SQLDataType::Time(None, tz_info) => {

crates/control_plane/src/sql/sql.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ impl SqlExecutor {
4444
let dialect = state.config().options().sql_parser.dialect.as_str();
4545
// Update query to use custom JSON functions
4646
let query = self.preprocess_query(query);
47-
println!("Query: {}", query);
4847
let mut statement = state.sql_to_statement(&query, dialect)?;
4948
// statement = self.update_statement_references(statement, warehouse_name);
5049
// query = statement.to_string();
@@ -168,19 +167,21 @@ impl SqlExecutor {
168167
.unwrap();
169168

170169
// we don't need physical table for transient tables
171-
if !transient {
172-
// Copy data from InMemory table to created table
173-
let insert_query =
174-
format!("INSERT INTO {new_table_full_name} SELECT * FROM {new_table_name}");
175-
let result = self.execute_with_custom_plan(&insert_query, warehouse_name).await?;
176-
// self.ctx.sql(&insert_query).await?.collect().await?;
170+
// if !transient {
171+
// Copy data from InMemory table to created table
172+
let insert_query =
173+
format!("INSERT INTO {new_table_full_name} SELECT * FROM {new_table_name}");
174+
let result = self
175+
.execute_with_custom_plan(&insert_query, warehouse_name)
176+
.await?;
177+
// self.ctx.sql(&insert_query).await?.collect().await?;
177178

178-
// Drop InMemory table
179-
let drop_query = format!("DROP TABLE {new_table_name}");
180-
self.ctx.sql(&drop_query).await?.collect().await?;
181-
return Ok(result);
182-
}
183-
Ok(created_entity_response())
179+
// Drop InMemory table
180+
let drop_query = format!("DROP TABLE {new_table_name}");
181+
self.ctx.sql(&drop_query).await?.collect().await?;
182+
return Ok(result);
183+
// }
184+
// Ok(created_entity_response())
184185
} else {
185186
Err(datafusion::error::DataFusionError::NotImplemented(
186187
"Only CREATE TABLE statements are supported".to_string(),
@@ -225,19 +226,23 @@ impl SqlExecutor {
225226
Ok(created_entity_response())
226227
}
227228

228-
pub async fn get_custom_logical_plan(&self, query: &String, warehouse_name: &str) -> Result<LogicalPlan> {
229+
pub async fn get_custom_logical_plan(
230+
&self,
231+
query: &String,
232+
warehouse_name: &str,
233+
) -> Result<LogicalPlan> {
229234
let state = self.ctx.state();
230235
let dialect = state.config().options().sql_parser.dialect.as_str();
231236
let mut statement = state.sql_to_statement(query, dialect)?;
237+
println!("raw query: {:?}", statement.to_string());
232238
statement = self.update_statement_references(statement, warehouse_name);
233-
println!("modified query: {:?}", statement);
239+
println!("modified query: {:?}", statement.to_string());
234240

235241
if let DFStatement::Statement(s) = statement.clone() {
236242
let mut ctx_provider = CustomContextProvider {
237243
state: &state,
238244
tables: HashMap::new(),
239245
};
240-
241246
let references = state.resolve_table_references(&statement)?;
242247
println!("References: {:?}", references);
243248
for reference in references {
@@ -311,9 +316,15 @@ impl SqlExecutor {
311316
})
312317
}
313318

314-
pub async fn execute_with_custom_plan(&self, query: &String, warehouse_name: &str) -> Result<Vec<RecordBatch>> {
319+
pub async fn execute_with_custom_plan(
320+
&self,
321+
query: &String,
322+
warehouse_name: &str,
323+
) -> Result<Vec<RecordBatch>> {
315324
let plan = self.get_custom_logical_plan(query, warehouse_name).await?;
316-
self.ctx.execute_logical_plan(plan).await?.collect().await
325+
let res = self.ctx.execute_logical_plan(plan).await?.collect().await;
326+
println!("Result: {:?}", res);
327+
res
317328
}
318329

319330
pub fn update_statement_references(
@@ -355,7 +366,6 @@ impl SqlExecutor {
355366
}
356367
Statement::Query(mut query) => {
357368
self.update_tables_in_query(query.as_mut(), warehouse_name);
358-
println!("Query: {:?}", query);
359369
DFStatement::Statement(Box::new(Statement::Query(query)))
360370
}
361371
_ => statement,

crates/nexus/src/http/dbt/handlers.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use regex::Regex;
1313
use serde_json::json;
1414
use std::io::Read;
1515
use std::result::Result;
16+
use tokio::fs::OpenOptions;
17+
use tokio::io::AsyncWriteExt;
1618
use uuid::Uuid;
1719

1820
pub async fn login(
@@ -76,6 +78,10 @@ pub async fn query(
7678
let (params, sql_query) = body_json.get_sql_text();
7779
println!("Query raw: {:?}", body_json.sql_text);
7880

81+
// if let Err(e) = log_query(body_json.sql_text.as_str()).await {
82+
// eprintln!("Failed to log query: {}", e);
83+
// }
84+
7985
let token = match extract_token(&headers) {
8086
Some(token) => token,
8187
None => {
@@ -148,3 +154,14 @@ pub fn extract_token(headers: &HeaderMap) -> Option<String> {
148154
})
149155
})
150156
}
157+
158+
async fn log_query(query: &str) -> Result<(), std::io::Error> {
159+
let mut file = OpenOptions::new()
160+
.create(true)
161+
.append(true)
162+
.open("queries.log")
163+
.await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
164+
file.write_all(query.as_bytes()).await?;
165+
file.write_all(b"\n").await?;
166+
Ok(())
167+
}

0 commit comments

Comments
 (0)