Skip to content

Commit ad37fbf

Browse files
committed
DataFusion 46.0.0
1 parent 2ccef39 commit ad37fbf

File tree

6 files changed

+52
-48
lines changed

6 files changed

+52
-48
lines changed

src/dataframe.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ use datafusion::arrow::util::pretty;
3030
use datafusion::common::stats::Precision;
3131
use datafusion::common::{DFSchema, DataFusionError, Statistics, UnnestOptions};
3232
use datafusion::common::tree_node::{Transformed, TreeNode};
33-
use datafusion::config::{ConfigOptions, CsvOptions, TableParquetOptions};
33+
use datafusion::config::{CsvOptions, TableParquetOptions};
3434
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
35+
use datafusion::datasource::memory::DataSourceExec;
3536
use datafusion::datasource::TableProvider;
36-
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
37-
use datafusion::datasource::physical_plan::ParquetExec;
37+
use datafusion::datasource::physical_plan::FileScanConfig;
38+
use datafusion::datasource::source::DataSource;
3839
use datafusion::execution::SendableRecordBatchStream;
3940
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
4041
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -753,25 +754,28 @@ impl DistributedPlan {
753754
return Ok(())
754755
}
755756
let updated_plan = self.plan().clone().transform_up(|node| {
756-
if let Some(parquet) = node.as_any().downcast_ref::<ParquetExec>() {
757+
if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
757758
// Remove redundant ranges from partition files because ParquetExec refuses to repartition
758759
// if any file has a range defined (even when the range actually covers the entire file).
759760
// The EnforceDistribution optimizer rule adds ranges for both full and partial files,
760761
// so this tries to rever that to trigger a repartition when no files are actually split.
761-
let mut file_groups = parquet.base_config().file_groups.clone();
762-
for group in file_groups.iter_mut() {
763-
for file in group.iter_mut() {
764-
if let Some(range) = &file.range {
765-
if range.start == 0 && range.end == file.object_meta.size as i64 {
766-
file.range = None; // remove redundant range
762+
if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>() {
763+
let mut file_groups = file_scan.file_groups.clone();
764+
for group in file_groups.iter_mut() {
765+
for file in group.iter_mut() {
766+
if let Some(range) = &file.range {
767+
if range.start == 0 && range.end == file.object_meta.size as i64 {
768+
file.range = None; // remove redundant range
769+
}
767770
}
768771
}
769772
}
770-
}
771-
if let Some(repartitioned) = parquet.clone().into_builder().with_file_groups(file_groups)
772-
.build_arc()
773-
.repartitioned(desired_parallelism, &ConfigOptions::default())? {
774-
Ok(Transformed::yes(repartitioned))
773+
if let Some(repartitioned) = file_scan.clone().with_file_groups(file_groups)
774+
.repartitioned(desired_parallelism, 10 * 1024 * 1024, None)? {
775+
Ok(Transformed::yes(Arc::new(DataSourceExec::new(repartitioned))))
776+
} else {
777+
Ok(Transformed::no(node))
778+
}
775779
} else {
776780
Ok(Transformed::no(node))
777781
}

src/expr.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ use datafusion::arrow::pyarrow::PyArrowType;
3030
use datafusion::functions::core::expr_ext::FieldAccessor;
3131
use datafusion::logical_expr::{
3232
col,
33-
expr::{AggregateFunction, InList, InSubquery, ScalarFunction, WindowFunction},
33+
expr::{AggregateFunction, AggregateFunctionParams, InList, InSubquery, ScalarFunction, WindowFunction},
3434
lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast,
3535
};
36-
36+
use datafusion::logical_expr::expr::WindowFunctionParams;
3737
use crate::common::data_type::{DataTypeMap, NullTreatment, PyScalarValue, RexType};
3838
use crate::errors::{
3939
py_runtime_err, py_type_err, py_unsupported_variant_err, PyDataFusionError, PyDataFusionResult,
@@ -394,9 +394,9 @@ impl PyExpr {
394394
| Expr::InSubquery(InSubquery { expr, .. }) => Ok(vec![PyExpr::from(*expr.clone())]),
395395

396396
// Expr variants containing a collection of Expr(s) for operands
397-
Expr::AggregateFunction(AggregateFunction { args, .. })
397+
Expr::AggregateFunction(AggregateFunction { params: AggregateFunctionParams { args, .. }, .. })
398398
| Expr::ScalarFunction(ScalarFunction { args, .. })
399-
| Expr::WindowFunction(WindowFunction { args, .. }) => {
399+
| Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => {
400400
Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
401401
}
402402

@@ -575,7 +575,7 @@ impl PyExpr {
575575
Expr::AggregateFunction(agg_fn) => {
576576
let window_fn = Expr::WindowFunction(WindowFunction::new(
577577
WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
578-
agg_fn.args.clone(),
578+
agg_fn.params.args.clone(),
579579
));
580580

581581
add_builder_fns_to_window(

src/expr/aggregate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::DataFusionError;
19-
use datafusion::logical_expr::expr::{AggregateFunction, Alias};
19+
use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
2020
use datafusion::logical_expr::logical_plan::Aggregate;
2121
use datafusion::logical_expr::Expr;
2222
use pyo3::{prelude::*, IntoPyObjectExt};
@@ -126,7 +126,7 @@ impl PyAggregate {
126126
match expr {
127127
// TODO: This Alias logic seems to be returning some strange results that we should investigate
128128
Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
129-
Expr::AggregateFunction(AggregateFunction { func: _, args, .. }) => {
129+
Expr::AggregateFunction(AggregateFunction { func: _, params: AggregateFunctionParams { args, .. }}) => {
130130
Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect())
131131
}
132132
_ => Err(py_type_err(

src/expr/aggregate_expr.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl From<AggregateFunction> for PyAggregateFunction {
4040

4141
impl Display for PyAggregateFunction {
4242
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
43-
let args: Vec<String> = self.aggr.args.iter().map(|expr| expr.to_string()).collect();
43+
let args: Vec<String> = self.aggr.params.args.iter().map(|expr| expr.to_string()).collect();
4444
write!(f, "{}({})", self.aggr.func.name(), args.join(", "))
4545
}
4646
}
@@ -54,12 +54,13 @@ impl PyAggregateFunction {
5454

5555
/// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
5656
fn is_distinct(&self) -> bool {
57-
self.aggr.distinct
57+
self.aggr.params.distinct
5858
}
5959

6060
/// Get the arguments to the aggregate function
6161
fn args(&self) -> Vec<PyExpr> {
6262
self.aggr
63+
.params
6364
.args
6465
.iter()
6566
.map(|expr| PyExpr::from(expr.clone()))

src/expr/window.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::{DataFusionError, ScalarValue};
19-
use datafusion::logical_expr::expr::WindowFunction;
19+
use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
2020
use datafusion::logical_expr::{Expr, Window, WindowFrame, WindowFrameBound, WindowFrameUnits};
2121
use pyo3::{prelude::*, IntoPyObjectExt};
2222
use std::fmt::{self, Display, Formatter};
@@ -118,15 +118,15 @@ impl PyWindowExpr {
118118
/// Returns order by columns in a window function expression
119119
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PySortExpr>> {
120120
match expr.expr.unalias() {
121-
Expr::WindowFunction(WindowFunction { order_by, .. }) => py_sort_expr_list(&order_by),
121+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { order_by, .. }, .. }) => py_sort_expr_list(&order_by),
122122
other => Err(not_window_function_err(other)),
123123
}
124124
}
125125

126126
/// Return partition by columns in a window function expression
127127
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
128128
match expr.expr.unalias() {
129-
Expr::WindowFunction(WindowFunction { partition_by, .. }) => {
129+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { partition_by, .. }, .. }) => {
130130
py_expr_list(&partition_by)
131131
}
132132
other => Err(not_window_function_err(other)),
@@ -136,7 +136,7 @@ impl PyWindowExpr {
136136
/// Return input args for window function
137137
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
138138
match expr.expr.unalias() {
139-
Expr::WindowFunction(WindowFunction { args, .. }) => py_expr_list(&args),
139+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => py_expr_list(&args),
140140
other => Err(not_window_function_err(other)),
141141
}
142142
}
@@ -152,7 +152,7 @@ impl PyWindowExpr {
152152
/// Returns a Pywindow frame for a given window function expression
153153
pub fn get_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
154154
match expr.expr.unalias() {
155-
Expr::WindowFunction(WindowFunction { window_frame, .. }) => Some(window_frame.into()),
155+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams {window_frame, .. }, .. }) => Some(window_frame.into()),
156156
_ => None,
157157
}
158158
}

src/functions.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ use crate::expr::sort_expr::to_sort_expressions;
3131
use crate::expr::sort_expr::PySortExpr;
3232
use crate::expr::window::PyWindowFrame;
3333
use crate::expr::PyExpr;
34-
use datafusion::common::{Column, ScalarValue, TableReference};
34+
use datafusion::common::{Column, ScalarValue, Spans, TableReference};
3535
use datafusion::execution::FunctionRegistry;
3636
use datafusion::functions;
3737
use datafusion::functions_aggregate;
3838
use datafusion::functions_window;
39-
use datafusion::logical_expr::expr::Alias;
39+
use datafusion::logical_expr::expr::{Alias, WindowFunctionParams};
4040
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
4141
use datafusion::logical_expr::{expr::WindowFunction, lit, Expr, WindowFunctionDefinition};
4242

@@ -196,10 +196,7 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
196196
#[pyfunction]
197197
fn col(name: &str) -> PyResult<PyExpr> {
198198
Ok(PyExpr {
199-
expr: datafusion::logical_expr::Expr::Column(Column {
200-
relation: None,
201-
name: name.to_string(),
202-
}),
199+
expr: Expr::Column(Column::new_unqualified(name)),
203200
})
204201
}
205202

@@ -314,19 +311,21 @@ fn window(
314311
Ok(PyExpr {
315312
expr: datafusion::logical_expr::Expr::WindowFunction(WindowFunction {
316313
fun,
317-
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
318-
partition_by: partition_by
319-
.unwrap_or_default()
320-
.into_iter()
321-
.map(|x| x.expr)
322-
.collect::<Vec<_>>(),
323-
order_by: order_by
324-
.unwrap_or_default()
325-
.into_iter()
326-
.map(|x| x.into())
327-
.collect::<Vec<_>>(),
328-
window_frame,
329-
null_treatment: None,
314+
params: WindowFunctionParams {
315+
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
316+
partition_by: partition_by
317+
.unwrap_or_default()
318+
.into_iter()
319+
.map(|x| x.expr)
320+
.collect::<Vec<_>>(),
321+
order_by: order_by
322+
.unwrap_or_default()
323+
.into_iter()
324+
.map(|x| x.into())
325+
.collect::<Vec<_>>(),
326+
window_frame,
327+
null_treatment: None,
328+
},
330329
}),
331330
})
332331
}

0 commit comments

Comments
 (0)