Skip to content

Commit 49fb4b7

Browse files
committed
sketch out implementation
1 parent 231f230 commit 49fb4b7

File tree

1 file changed

+76
-10
lines changed

1 file changed

+76
-10
lines changed

datafusion-examples/examples/simple_udwf.rs

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,14 @@ async fn main() -> Result<()> {
7070
// print the results
7171
df.show().await?;
7272

73-
// ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: Run the window functon so that each invocation only sees 5 rows: the 2 before and 2 after) using
74-
let df = ctx.sql("SELECT car, \
75-
speed, \
76-
lag(speed, 1) OVER (PARTITION BY car ORDER BY time ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING),\
77-
time \
78-
from cars").await?;
79-
// print the results
80-
df.show().await?;
73+
// // ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: Run the window functon so that each invocation only sees 5 rows: the 2 before and 2 after) using
74+
// let df = ctx.sql("SELECT car, \
75+
// speed, \
76+
// lag(speed, 1) OVER (PARTITION BY car ORDER BY time ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING),\
77+
// time \
78+
// from cars").await?;
79+
// // print the results
80+
// df.show().await?;
8181

8282
// todo show how to run dataframe API as well
8383

@@ -108,7 +108,73 @@ fn return_type(arg_types: &[DataType]) -> Result<Arc<DataType>> {
108108
Ok(Arc::new(arg_types[0].clone()))
109109
}
110110

111-
/// create a partition evaluator for this argument
111+
/// Create a partition evaluator for this argument
112112
fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
113-
todo!();
113+
Ok(Box::new(MyPartitionEvaluator::new()))
114+
}
115+
116+
117+
118+
/// This implements the lowest level evaluation for a window function
119+
///
120+
/// It handles calculating the value of the window function for each
121+
/// distinct values of `PARTITION BY` (each car type in our example)
122+
#[derive(Clone, Debug)]
123+
struct MyPartitionEvaluator {
124+
}
125+
126+
impl MyPartitionEvaluator {
127+
fn new() -> Self
128+
{
129+
Self {}
130+
}
114131
}
132+
133+
134+
135+
136+
/// These different evaluation methods are called depending on the various settings of WindowUDF
137+
impl PartitionEvaluator for MyPartitionEvaluator {
138+
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<std::ops::Range<usize>> {
139+
Err(DataFusionError::NotImplemented(
140+
"get_range is not implemented for this window function".to_string(),
141+
))
142+
}
143+
144+
/// This function is given the values of each partition
145+
fn evaluate(&self, values: &[arrow::array::ArrayRef], num_rows: usize) -> Result<arrow::array::ArrayRef> {
146+
println!("processing num_rows={num_rows}, values:\n{values:#?}");
147+
Err(DataFusionError::NotImplemented(
148+
"evaluate is not implemented by default".into(),
149+
))
150+
}
151+
152+
fn evaluate_stateful(&mut self, _values: &[arrow::array::ArrayRef]) -> Result<datafusion_common::ScalarValue> {
153+
Err(DataFusionError::NotImplemented(
154+
"evaluate_stateful is not implemented by default".into(),
155+
))
156+
}
157+
158+
fn evaluate_with_rank(
159+
&self,
160+
_num_rows: usize,
161+
_ranks_in_partition: &[std::ops::Range<usize>],
162+
) -> Result<arrow::array::ArrayRef> {
163+
Err(DataFusionError::NotImplemented(
164+
"evaluate_partition_with_rank is not implemented by default".into(),
165+
))
166+
}
167+
168+
fn evaluate_inside_range(
169+
&self,
170+
_values: &[arrow::array::ArrayRef],
171+
_range: &std::ops::Range<usize>,
172+
) -> Result<datafusion_common::ScalarValue> {
173+
Err(DataFusionError::NotImplemented(
174+
"evaluate_inside_range is not implemented by default".into(),
175+
))
176+
}
177+
}
178+
179+
180+
// TODO show how to use other evaluate methods

0 commit comments

Comments
 (0)