Skip to content

Commit 80ecd8e

Browse files
least (copy), greatest (copy) and dateadd mock (#70)
* Test * Add least, greatest and mock for dateadd * logic part if we're to continue using utf8, tested with this query with curl "select dateadd('\''days'\'', 3,'\''2024-12-26'\'')" * snowflake compatability * changed unwrap's for unwrap_or's per Scott's advice, question do we go the ZII pattern from Casey Muratori (MollyRocket) meaning the defaults or do we want some specific panic? * Fix map_or * Fix map_or * Fix map_or * fixed date_add possible overflow, leap years and months * dateadd completed * Fix schema --------- Co-authored-by: DanCodedThis <[email protected]>
1 parent db27e6c commit 80ecd8e

File tree

11 files changed

+801
-67
lines changed

11 files changed

+801
-67
lines changed

crates/control_plane/src/service.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,15 +286,17 @@ impl ControlService for ControlServiceImpl {
286286

287287
// let mut buffer = Vec::new();
288288
// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
289-
// let mut stream_writer = StreamWriter::try_new_with_options(
290-
// &mut buffer, &records[0].schema_ref(), options).unwrap();
289+
// let mut stream_writer =
290+
// StreamWriter::try_new_with_options(&mut buffer, &records[0].schema_ref(), options)
291+
// .unwrap();
291292
// stream_writer.write(&records[0]).unwrap();
292293
// stream_writer.finish().unwrap();
293294
// drop(stream_writer);
294-
//
295+
295296
// // Try to add flatbuffer verification
296297
// println!("{:?}", buffer.len());
297-
// let res = general_purpose::STANDARD.encode(buffer);
298+
// let base64 = general_purpose::STANDARD.encode(buffer);
299+
// Ok((base64, columns))
298300
// let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap();
299301
//
300302
// let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded);
Lines changed: 114 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
use crate::models::ColumnInfo;
22
use arrow::array::{
3-
Array, Int32Array, Int64Array, StructArray, TimestampMicrosecondArray,
4-
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UnionArray,
3+
Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
4+
TimestampNanosecondArray, TimestampSecondArray, UnionArray,
55
};
66
use arrow::datatypes::{Field, Schema, TimeUnit};
77
use arrow::record_batch::RecordBatch;
8+
use chrono::{DateTime, Utc};
89
use datafusion::arrow::array::ArrayRef;
910
use datafusion::arrow::datatypes::DataType;
1011
use datafusion::common::Result as DataFusionResult;
1112
use std::sync::Arc;
1213

14+
const TIMESTAMP_FORMAT: &str = "%Y-%m-%d-%H:%M:%S%.9f";
15+
1316
pub fn first_non_empty_type(union_array: &UnionArray) -> Option<(DataType, ArrayRef)> {
1417
for i in 0..union_array.type_ids().len() {
1518
let type_id = union_array.type_id(i);
@@ -71,8 +74,6 @@ pub fn convert_record_batches(
7174
columns.push(converted_column);
7275
}
7376
let new_schema = Arc::new(Schema::new(fields));
74-
println!("new schema: {:?}", new_schema);
75-
println!("columns: {:?}", columns);
7677
let converted_batch = RecordBatch::try_new(new_schema, columns)?;
7778
converted_batches.push(converted_batch);
7879
}
@@ -81,60 +82,116 @@ pub fn convert_record_batches(
8182
}
8283

8384
fn convert_timestamp_to_struct(column: &ArrayRef, unit: &TimeUnit) -> ArrayRef {
84-
let (epoch, fraction) = match unit {
85-
TimeUnit::Second => {
86-
let array = column
87-
.as_any()
88-
.downcast_ref::<TimestampSecondArray>()
89-
.unwrap();
90-
let epoch: Int64Array = array.clone().unary(|x| x);
91-
let fraction: Int32Array = Int32Array::from(vec![0; column.len()]);
92-
(epoch, fraction)
93-
}
94-
TimeUnit::Millisecond => {
95-
let array = column
96-
.as_any()
97-
.downcast_ref::<TimestampMillisecondArray>()
98-
.unwrap();
99-
let epoch: Int64Array = array.clone().unary(|x| x / 1_000);
100-
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000 * 1_000_000) as i32);
101-
(epoch, fraction)
102-
}
103-
TimeUnit::Microsecond => {
104-
let array = column
105-
.as_any()
106-
.downcast_ref::<TimestampMicrosecondArray>()
107-
.unwrap();
108-
let epoch: Int64Array = array.clone().unary(|x| x / 1_000_000);
109-
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000_000 * 1_000) as i32);
110-
(epoch, fraction)
111-
}
112-
TimeUnit::Nanosecond => {
113-
let array = column
114-
.as_any()
115-
.downcast_ref::<TimestampNanosecondArray>()
116-
.unwrap();
117-
let epoch: Int64Array = array.clone().unary(|x| x / 1_000_000_000);
118-
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000_000_000) as i32);
119-
(epoch, fraction)
120-
}
85+
let now = Utc::now();
86+
let timestamps: Vec<_> = match unit {
87+
TimeUnit::Second => column
88+
.as_any()
89+
.downcast_ref::<TimestampSecondArray>()
90+
.unwrap()
91+
.iter()
92+
.map(|x| {
93+
let ts = DateTime::from_timestamp(x.unwrap_or(now.timestamp()), 0).unwrap();
94+
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
95+
})
96+
.collect(),
97+
TimeUnit::Millisecond => column
98+
.as_any()
99+
.downcast_ref::<TimestampMillisecondArray>()
100+
.unwrap()
101+
.iter()
102+
.map(|x| {
103+
let ts =
104+
DateTime::from_timestamp_millis(x.unwrap_or(now.timestamp_millis())).unwrap();
105+
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
106+
})
107+
.collect(),
108+
TimeUnit::Microsecond => column
109+
.as_any()
110+
.downcast_ref::<TimestampMicrosecondArray>()
111+
.unwrap()
112+
.iter()
113+
.map(|x| {
114+
let ts =
115+
DateTime::from_timestamp_micros(x.unwrap_or(now.timestamp_micros())).unwrap();
116+
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
117+
})
118+
.collect(),
119+
TimeUnit::Nanosecond => column
120+
.as_any()
121+
.downcast_ref::<TimestampNanosecondArray>()
122+
.unwrap()
123+
.iter()
124+
.map(|x| {
125+
let ts =
126+
DateTime::from_timestamp_nanos(x.unwrap_or(now.timestamp_nanos_opt().unwrap()));
127+
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
128+
})
129+
.collect(),
121130
};
122-
131+
// let (epoch, fraction) = match unit {
132+
// TimeUnit::Second => {
133+
// let array = column
134+
// .as_any()
135+
// .downcast_ref::<TimestampSecondArray>()
136+
// .unwrap();
137+
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now)).collect();
138+
// let fraction: Int32Array = Int32Array::from(vec![0; column.len()]);
139+
// (epoch, fraction)
140+
// }
141+
// TimeUnit::Millisecond => {
142+
// let array = column
143+
// .as_any()
144+
// .downcast_ref::<TimestampMillisecondArray>()
145+
// .unwrap();
146+
// let now_millis = now * 1_000;
147+
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now_millis) / 1_000).collect();
148+
// let fraction: Int32Array = array
149+
// .iter()
150+
// .map(|x| (x.unwrap_or(0) % 1_000 * 1_000_000) as i32)
151+
// .collect();
152+
// (epoch, fraction)
153+
// }
154+
// TimeUnit::Microsecond => {
155+
// let array = column
156+
// .as_any()
157+
// .downcast_ref::<TimestampMicrosecondArray>()
158+
// .unwrap();
159+
// let now_micros = now * 1_000_000;
160+
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now_micros) / 1_000_000).collect();
161+
// let fraction: Int32Array = array
162+
// .iter()
163+
// .map(|x| (x.unwrap_or(0) % 1_000_000 * 1_000) as i32)
164+
// .collect();
165+
// (epoch, fraction)
166+
// }
167+
// TimeUnit::Nanosecond => {
168+
// let array = column
169+
// .as_any()
170+
// .downcast_ref::<TimestampNanosecondArray>()
171+
// .unwrap();
172+
// let now_nanos = now * 1_000_000_000;
173+
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now_nanos) / 1_000_000_000).collect();
174+
// let fraction: Int32Array = array
175+
// .iter()
176+
// .map(|x| (x.unwrap_or(0) % 1_000_000_000) as i32)
177+
// .collect();
178+
// (epoch, fraction)
179+
// }
180+
// };
181+
// let string_values: Vec<_> = epoch.iter().map(|x| x.unwrap_or(0).to_string()).collect();
182+
// let string_array = StringArray::from(string_values);
123183
// let timezone = Int32Array::from(vec![1440; column.len()]); // Assuming UTC timezone
124-
let struct_array = StructArray::from(vec![
125-
(
126-
Arc::new(Field::new("epoch", DataType::Int64, false)),
127-
Arc::new(epoch) as ArrayRef,
128-
),
129-
(
130-
Arc::new(Field::new("fraction", DataType::Int32, false)),
131-
Arc::new(fraction) as ArrayRef,
132-
),
133-
// (
134-
// Arc::new(Field::new("timezone", DataType::Int32, false)),
135-
// Arc::new(timezone) as ArrayRef,
136-
// ),
137-
]);
184+
// let struct_array = StructArray::try_new(
185+
// vec![
186+
// Arc::new(Field::new("epoch", DataType::Int64, false)),
187+
// Arc::new(Field::new("fraction", DataType::Int32, false)),
188+
// ]
189+
// .into(),
190+
// vec![Arc::new(epoch) as ArrayRef, Arc::new(fraction) as ArrayRef],
191+
// None,
192+
// )
193+
// .unwrap();
194+
// Arc::new(epoch) as ArrayRef
138195

139-
Arc::new(struct_array) as ArrayRef
196+
Arc::new(StringArray::from(timestamps)) as ArrayRef
140197
}

0 commit comments

Comments
 (0)