Skip to content

Commit d0315ff

Browse files
authored
feat: Update DataFusion dependency to 46 (#1079)
* Update DataFusion dependency to 46 * There was an update upstream in the exec but it is not a breaking change and only needs unit test updates
1 parent 42982da commit d0315ff

File tree

8 files changed

+252
-183
lines changed

8 files changed

+252
-183
lines changed

Cargo.lock

+170-126
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+9-9
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,24 @@ protoc = [ "datafusion-substrait/protoc" ]
3434
substrait = ["dep:datafusion-substrait"]
3535

3636
[dependencies]
37-
tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread", "sync"] }
37+
tokio = { version = "1.43", features = ["macros", "rt", "rt-multi-thread", "sync"] }
3838
pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py39"] }
3939
pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"]}
40-
arrow = { version = "54", features = ["pyarrow"] }
41-
datafusion = { version = "45.0.0", features = ["avro", "unicode_expressions"] }
42-
datafusion-substrait = { version = "45.0.0", optional = true }
43-
datafusion-proto = { version = "45.0.0" }
44-
datafusion-ffi = { version = "45.0.0" }
45-
prost = "0.13" # keep in line with `datafusion-substrait`
40+
arrow = { version = "54.2.1", features = ["pyarrow"] }
41+
datafusion = { version = "46.0.1", features = ["avro", "unicode_expressions"] }
42+
datafusion-substrait = { version = "46.0.1", optional = true }
43+
datafusion-proto = { version = "46.0.1" }
44+
datafusion-ffi = { version = "46.0.1" }
45+
prost = "0.13.1" # keep in line with `datafusion-substrait`
4646
uuid = { version = "1.12", features = ["v4"] }
4747
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
48-
async-trait = "0.1"
48+
async-trait = "0.1.73"
4949
futures = "0.3"
5050
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
5151
url = "2"
5252

5353
[build-dependencies]
54-
prost-types = "0.13" # keep in line with `datafusion-substrait`
54+
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
5555
pyo3-build-config = "0.23"
5656

5757
[lib]

python/tests/test_dataframe.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,8 @@ def test_execution_plan(aggregate_df):
753753
assert "AggregateExec:" in indent
754754
assert "CoalesceBatchesExec:" in indent
755755
assert "RepartitionExec:" in indent
756-
assert "CsvExec:" in indent
756+
assert "DataSourceExec:" in indent
757+
assert "file_type=csv" in indent
757758

758759
ctx = SessionContext()
759760
rows_returned = 0

src/expr.rs

+22-17
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion::logical_expr::expr::{AggregateFunctionParams, WindowFunctionParams};
1819
use datafusion::logical_expr::utils::exprlist_to_fields;
1920
use datafusion::logical_expr::{
2021
ExprFuncBuilder, ExprFunctionExt, LogicalPlan, WindowFunctionDefinition,
@@ -172,6 +173,7 @@ impl PyExpr {
172173
Expr::ScalarSubquery(value) => {
173174
Ok(scalar_subquery::PyScalarSubquery::from(value.clone()).into_bound_py_any(py)?)
174175
}
176+
#[allow(deprecated)]
175177
Expr::Wildcard { qualifier, options } => Err(py_unsupported_variant_err(format!(
176178
"Converting Expr::Wildcard to a Python object is not implemented : {:?} {:?}",
177179
qualifier, options
@@ -332,7 +334,6 @@ impl PyExpr {
332334
| Expr::AggregateFunction { .. }
333335
| Expr::WindowFunction { .. }
334336
| Expr::InList { .. }
335-
| Expr::Wildcard { .. }
336337
| Expr::Exists { .. }
337338
| Expr::InSubquery { .. }
338339
| Expr::GroupingSet(..)
@@ -346,6 +347,10 @@ impl PyExpr {
346347
| Expr::Unnest(_)
347348
| Expr::IsNotUnknown(_) => RexType::Call,
348349
Expr::ScalarSubquery(..) => RexType::ScalarSubquery,
350+
#[allow(deprecated)]
351+
Expr::Wildcard { .. } => {
352+
return Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"))
353+
}
349354
})
350355
}
351356

@@ -394,11 +399,15 @@ impl PyExpr {
394399
| Expr::InSubquery(InSubquery { expr, .. }) => Ok(vec![PyExpr::from(*expr.clone())]),
395400

396401
// Expr variants containing a collection of Expr(s) for operands
397-
Expr::AggregateFunction(AggregateFunction { args, .. })
402+
Expr::AggregateFunction(AggregateFunction {
403+
params: AggregateFunctionParams { args, .. },
404+
..
405+
})
398406
| Expr::ScalarFunction(ScalarFunction { args, .. })
399-
| Expr::WindowFunction(WindowFunction { args, .. }) => {
400-
Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
401-
}
407+
| Expr::WindowFunction(WindowFunction {
408+
params: WindowFunctionParams { args, .. },
409+
..
410+
}) => Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect()),
402411

403412
// Expr(s) that require more specific processing
404413
Expr::Case(Case {
@@ -465,13 +474,17 @@ impl PyExpr {
465474
Expr::GroupingSet(..)
466475
| Expr::Unnest(_)
467476
| Expr::OuterReferenceColumn(_, _)
468-
| Expr::Wildcard { .. }
469477
| Expr::ScalarSubquery(..)
470478
| Expr::Placeholder { .. }
471479
| Expr::Exists { .. } => Err(py_runtime_err(format!(
472480
"Unimplemented Expr type: {}",
473481
self.expr
474482
))),
483+
484+
#[allow(deprecated)]
485+
Expr::Wildcard { .. } => {
486+
Err(py_unsupported_variant_err("Expr::Wildcard is unsupported"))
487+
}
475488
}
476489
}
477490

@@ -575,7 +588,7 @@ impl PyExpr {
575588
Expr::AggregateFunction(agg_fn) => {
576589
let window_fn = Expr::WindowFunction(WindowFunction::new(
577590
WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
578-
agg_fn.args.clone(),
591+
agg_fn.params.args.clone(),
579592
));
580593

581594
add_builder_fns_to_window(
@@ -663,16 +676,8 @@ impl PyExpr {
663676

664677
/// Create a [Field] representing an [Expr], given an input [LogicalPlan] to resolve against
665678
pub fn expr_to_field(expr: &Expr, input_plan: &LogicalPlan) -> PyDataFusionResult<Arc<Field>> {
666-
match expr {
667-
Expr::Wildcard { .. } => {
668-
// Since * could be any of the valid column names just return the first one
669-
Ok(Arc::new(input_plan.schema().field(0).clone()))
670-
}
671-
_ => {
672-
let fields = exprlist_to_fields(&[expr.clone()], input_plan)?;
673-
Ok(fields[0].1.clone())
674-
}
675-
}
679+
let fields = exprlist_to_fields(&[expr.clone()], input_plan)?;
680+
Ok(fields[0].1.clone())
676681
}
677682
fn _types(expr: &Expr) -> PyResult<DataTypeMap> {
678683
match expr {

src/expr/aggregate.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::DataFusionError;
19-
use datafusion::logical_expr::expr::{AggregateFunction, Alias};
19+
use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
2020
use datafusion::logical_expr::logical_plan::Aggregate;
2121
use datafusion::logical_expr::Expr;
2222
use pyo3::{prelude::*, IntoPyObjectExt};
@@ -126,9 +126,11 @@ impl PyAggregate {
126126
match expr {
127127
// TODO: This Alias logic seems to be returning some strange results that we should investigate
128128
Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
129-
Expr::AggregateFunction(AggregateFunction { func: _, args, .. }) => {
130-
Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect())
131-
}
129+
Expr::AggregateFunction(AggregateFunction {
130+
func: _,
131+
params: AggregateFunctionParams { args, .. },
132+
..
133+
}) => Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect()),
132134
_ => Err(py_type_err(
133135
"Encountered a non Aggregate type in aggregation_arguments",
134136
)),

src/expr/aggregate_expr.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ impl From<AggregateFunction> for PyAggregateFunction {
4040

4141
impl Display for PyAggregateFunction {
4242
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
43-
let args: Vec<String> = self.aggr.args.iter().map(|expr| expr.to_string()).collect();
43+
let args: Vec<String> = self
44+
.aggr
45+
.params
46+
.args
47+
.iter()
48+
.map(|expr| expr.to_string())
49+
.collect();
4450
write!(f, "{}({})", self.aggr.func.name(), args.join(", "))
4551
}
4652
}
@@ -54,12 +60,13 @@ impl PyAggregateFunction {
5460

5561
/// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
5662
fn is_distinct(&self) -> bool {
57-
self.aggr.distinct
63+
self.aggr.params.distinct
5864
}
5965

6066
/// Get the arguments to the aggregate function
6167
fn args(&self) -> Vec<PyExpr> {
6268
self.aggr
69+
.params
6370
.args
6471
.iter()
6572
.map(|expr| PyExpr::from(expr.clone()))

src/expr/window.rs

+17-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::{DataFusionError, ScalarValue};
19-
use datafusion::logical_expr::expr::WindowFunction;
19+
use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
2020
use datafusion::logical_expr::{Expr, Window, WindowFrame, WindowFrameBound, WindowFrameUnits};
2121
use pyo3::{prelude::*, IntoPyObjectExt};
2222
use std::fmt::{self, Display, Formatter};
@@ -118,25 +118,32 @@ impl PyWindowExpr {
118118
/// Returns order by columns in a window function expression
119119
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PySortExpr>> {
120120
match expr.expr.unalias() {
121-
Expr::WindowFunction(WindowFunction { order_by, .. }) => py_sort_expr_list(&order_by),
121+
Expr::WindowFunction(WindowFunction {
122+
params: WindowFunctionParams { order_by, .. },
123+
..
124+
}) => py_sort_expr_list(&order_by),
122125
other => Err(not_window_function_err(other)),
123126
}
124127
}
125128

126129
/// Return partition by columns in a window function expression
127130
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
128131
match expr.expr.unalias() {
129-
Expr::WindowFunction(WindowFunction { partition_by, .. }) => {
130-
py_expr_list(&partition_by)
131-
}
132+
Expr::WindowFunction(WindowFunction {
133+
params: WindowFunctionParams { partition_by, .. },
134+
..
135+
}) => py_expr_list(&partition_by),
132136
other => Err(not_window_function_err(other)),
133137
}
134138
}
135139

136140
/// Return input args for window function
137141
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
138142
match expr.expr.unalias() {
139-
Expr::WindowFunction(WindowFunction { args, .. }) => py_expr_list(&args),
143+
Expr::WindowFunction(WindowFunction {
144+
params: WindowFunctionParams { args, .. },
145+
..
146+
}) => py_expr_list(&args),
140147
other => Err(not_window_function_err(other)),
141148
}
142149
}
@@ -152,7 +159,10 @@ impl PyWindowExpr {
152159
/// Returns a Pywindow frame for a given window function expression
153160
pub fn get_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
154161
match expr.expr.unalias() {
155-
Expr::WindowFunction(WindowFunction { window_frame, .. }) => Some(window_frame.into()),
162+
Expr::WindowFunction(WindowFunction {
163+
params: WindowFunctionParams { window_frame, .. },
164+
..
165+
}) => Some(window_frame.into()),
156166
_ => None,
157167
}
158168
}

src/functions.rs

+17-17
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use datafusion::functions_aggregate::all_default_aggregate_functions;
1919
use datafusion::functions_window::all_default_window_functions;
20+
use datafusion::logical_expr::expr::WindowFunctionParams;
2021
use datafusion::logical_expr::ExprFunctionExt;
2122
use datafusion::logical_expr::WindowFrame;
2223
use pyo3::{prelude::*, wrap_pyfunction};
@@ -215,10 +216,7 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
215216
#[pyfunction]
216217
fn col(name: &str) -> PyResult<PyExpr> {
217218
Ok(PyExpr {
218-
expr: datafusion::logical_expr::Expr::Column(Column {
219-
relation: None,
220-
name: name.to_string(),
221-
}),
219+
expr: datafusion::logical_expr::Expr::Column(Column::new_unqualified(name)),
222220
})
223221
}
224222

@@ -333,19 +331,21 @@ fn window(
333331
Ok(PyExpr {
334332
expr: datafusion::logical_expr::Expr::WindowFunction(WindowFunction {
335333
fun,
336-
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
337-
partition_by: partition_by
338-
.unwrap_or_default()
339-
.into_iter()
340-
.map(|x| x.expr)
341-
.collect::<Vec<_>>(),
342-
order_by: order_by
343-
.unwrap_or_default()
344-
.into_iter()
345-
.map(|x| x.into())
346-
.collect::<Vec<_>>(),
347-
window_frame,
348-
null_treatment: None,
334+
params: WindowFunctionParams {
335+
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
336+
partition_by: partition_by
337+
.unwrap_or_default()
338+
.into_iter()
339+
.map(|x| x.expr)
340+
.collect::<Vec<_>>(),
341+
order_by: order_by
342+
.unwrap_or_default()
343+
.into_iter()
344+
.map(|x| x.into())
345+
.collect::<Vec<_>>(),
346+
window_frame,
347+
null_treatment: None,
348+
},
349349
}),
350350
})
351351
}

0 commit comments

Comments
 (0)