Skip to content

Commit 2fa417d

Browse files
committed
DataFusion 46.0.0
1 parent 2b72f90 commit 2fa417d

File tree

8 files changed

+1557
-465
lines changed

8 files changed

+1557
-465
lines changed

Cargo.lock

Lines changed: 1470 additions & 382 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,18 @@ tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread", "sync
3838
pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py38"] }
3939
pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"]}
4040
arrow = { version = "54", features = ["pyarrow"] }
41-
datafusion = { version = "45.0.0", features = ["avro", "unicode_expressions"] }
42-
datafusion-substrait = { version = "45.0.0", optional = true }
43-
datafusion-proto = { version = "45.0.0" }
44-
datafusion-ffi = { version = "45.0.0" }
41+
datafusion = { version = "46.0.0", features = ["avro", "unicode_expressions"] }
42+
datafusion-substrait = { version = "46.0.0", optional = true }
43+
datafusion-proto = { version = "46.0.0" }
44+
datafusion-ffi = { version = "46.0.0" }
4545
prost = "0.13" # keep in line with `datafusion-substrait`
4646
uuid = { version = "1.12", features = ["v4"] }
4747
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4848
async-trait = "0.1"
4949
futures = "0.3"
5050
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
5151
url = "2"
52-
deltalake = { version = "0.23", features = ["datafusion", "azure", "s3"] }
52+
deltalake = { version = "0.25.0", features = ["datafusion", "azure", "s3"] }
5353

5454

5555
[build-dependencies]
@@ -61,36 +61,36 @@ name = "datafusion_python"
6161
crate-type = ["cdylib", "rlib"]
6262

6363
[patch.crates-io]
64-
datafusion = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
65-
datafusion-catalog = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
66-
datafusion-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
67-
datafusion-common-runtime = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
68-
datafusion-execution = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
69-
datafusion-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
70-
datafusion-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
71-
datafusion-functions = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
72-
datafusion-functions-aggregate = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
73-
datafusion-functions-aggregate-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
74-
datafusion-functions-nested = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
75-
datafusion-functions-table = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
76-
datafusion-functions-window = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
77-
datafusion-functions-window-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
78-
datafusion-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
79-
datafusion-physical-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
80-
datafusion-physical-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
81-
datafusion-physical-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
82-
datafusion-physical-plan = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
83-
datafusion-proto = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
84-
datafusion-proto-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
85-
datafusion-sql = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
86-
datafusion-substrait = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
87-
deltalake = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
88-
deltalake-catalog-glue = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
89-
deltalake-core = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
90-
deltalake-aws = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
91-
deltalake-azure = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
92-
deltalake-mount = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
93-
deltalake-sql = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
64+
datafusion = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
65+
datafusion-catalog = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
66+
datafusion-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
67+
datafusion-common-runtime = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
68+
datafusion-execution = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
69+
datafusion-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
70+
datafusion-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
71+
datafusion-functions = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
72+
datafusion-functions-aggregate = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
73+
datafusion-functions-aggregate-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
74+
datafusion-functions-nested = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
75+
datafusion-functions-table = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
76+
datafusion-functions-window = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
77+
datafusion-functions-window-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
78+
datafusion-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
79+
datafusion-physical-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
80+
datafusion-physical-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
81+
datafusion-physical-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
82+
datafusion-physical-plan = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
83+
datafusion-proto = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
84+
datafusion-proto-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
85+
datafusion-sql = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
86+
datafusion-substrait = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main-46' }
87+
deltalake = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main-46' }
88+
deltalake-catalog-glue = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main-46' }
89+
deltalake-core = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main-46' }
90+
deltalake-aws = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main-46' }
91+
deltalake-azure = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main-46' }
92+
deltalake-mount = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main-46' }
93+
deltalake-sql = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main-46' }
9494
reqwest = { git = 'https://github.com/hstack/reqwest.git', branch = 'disable-proxy-tunnel' }
9595
#datafusion-table-providers = { git = "https://github.com/hstack/datafusion-table-providers", branch = "main" }
9696
#datafusion = { path = "../arrow-datafusion/datafusion/core" }

src/dataframe.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ use datafusion::arrow::util::pretty;
3030
use datafusion::common::stats::Precision;
3131
use datafusion::common::{DFSchema, DataFusionError, Statistics, UnnestOptions};
3232
use datafusion::common::tree_node::{Transformed, TreeNode};
33-
use datafusion::config::{ConfigOptions, CsvOptions, TableParquetOptions};
33+
use datafusion::config::{CsvOptions, TableParquetOptions};
3434
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
35+
use datafusion::datasource::memory::DataSourceExec;
3536
use datafusion::datasource::TableProvider;
36-
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
37-
use datafusion::datasource::physical_plan::ParquetExec;
37+
use datafusion::datasource::physical_plan::FileScanConfig;
38+
use datafusion::datasource::source::DataSource;
3839
use datafusion::execution::SendableRecordBatchStream;
3940
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
4041
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -753,25 +754,28 @@ impl DistributedPlan {
753754
return Ok(())
754755
}
755756
let updated_plan = self.plan().clone().transform_up(|node| {
756-
if let Some(parquet) = node.as_any().downcast_ref::<ParquetExec>() {
757+
if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
757758
// Remove redundant ranges from partition files because ParquetExec refuses to repartition
758759
// if any file has a range defined (even when the range actually covers the entire file).
759760
// The EnforceDistribution optimizer rule adds ranges for both full and partial files,
760761
// so this tries to rever that to trigger a repartition when no files are actually split.
761-
let mut file_groups = parquet.base_config().file_groups.clone();
762-
for group in file_groups.iter_mut() {
763-
for file in group.iter_mut() {
764-
if let Some(range) = &file.range {
765-
if range.start == 0 && range.end == file.object_meta.size as i64 {
766-
file.range = None; // remove redundant range
762+
if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>() {
763+
let mut file_groups = file_scan.file_groups.clone();
764+
for group in file_groups.iter_mut() {
765+
for file in group.iter_mut() {
766+
if let Some(range) = &file.range {
767+
if range.start == 0 && range.end == file.object_meta.size as i64 {
768+
file.range = None; // remove redundant range
769+
}
767770
}
768771
}
769772
}
770-
}
771-
if let Some(repartitioned) = parquet.clone().into_builder().with_file_groups(file_groups)
772-
.build_arc()
773-
.repartitioned(desired_parallelism, &ConfigOptions::default())? {
774-
Ok(Transformed::yes(repartitioned))
773+
if let Some(repartitioned) = file_scan.clone().with_file_groups(file_groups)
774+
.repartitioned(desired_parallelism, 10 * 1024 * 1024, None)? {
775+
Ok(Transformed::yes(Arc::new(DataSourceExec::new(repartitioned))))
776+
} else {
777+
Ok(Transformed::no(node))
778+
}
775779
} else {
776780
Ok(Transformed::no(node))
777781
}

src/expr.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ use datafusion::arrow::pyarrow::PyArrowType;
3030
use datafusion::functions::core::expr_ext::FieldAccessor;
3131
use datafusion::logical_expr::{
3232
col,
33-
expr::{AggregateFunction, InList, InSubquery, ScalarFunction, WindowFunction},
33+
expr::{AggregateFunction, AggregateFunctionParams, InList, InSubquery, ScalarFunction, WindowFunction},
3434
lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast,
3535
};
36-
36+
use datafusion::logical_expr::expr::WindowFunctionParams;
3737
use crate::common::data_type::{DataTypeMap, NullTreatment, PyScalarValue, RexType};
3838
use crate::errors::{
3939
py_runtime_err, py_type_err, py_unsupported_variant_err, PyDataFusionError, PyDataFusionResult,
@@ -394,9 +394,9 @@ impl PyExpr {
394394
| Expr::InSubquery(InSubquery { expr, .. }) => Ok(vec![PyExpr::from(*expr.clone())]),
395395

396396
// Expr variants containing a collection of Expr(s) for operands
397-
Expr::AggregateFunction(AggregateFunction { args, .. })
397+
Expr::AggregateFunction(AggregateFunction { params: AggregateFunctionParams { args, .. }, .. })
398398
| Expr::ScalarFunction(ScalarFunction { args, .. })
399-
| Expr::WindowFunction(WindowFunction { args, .. }) => {
399+
| Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => {
400400
Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
401401
}
402402

@@ -575,7 +575,7 @@ impl PyExpr {
575575
Expr::AggregateFunction(agg_fn) => {
576576
let window_fn = Expr::WindowFunction(WindowFunction::new(
577577
WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
578-
agg_fn.args.clone(),
578+
agg_fn.params.args.clone(),
579579
));
580580

581581
add_builder_fns_to_window(

src/expr/aggregate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::DataFusionError;
19-
use datafusion::logical_expr::expr::{AggregateFunction, Alias};
19+
use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
2020
use datafusion::logical_expr::logical_plan::Aggregate;
2121
use datafusion::logical_expr::Expr;
2222
use pyo3::{prelude::*, IntoPyObjectExt};
@@ -126,7 +126,7 @@ impl PyAggregate {
126126
match expr {
127127
// TODO: This Alias logic seems to be returning some strange results that we should investigate
128128
Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
129-
Expr::AggregateFunction(AggregateFunction { func: _, args, .. }) => {
129+
Expr::AggregateFunction(AggregateFunction { func: _, params: AggregateFunctionParams { args, .. }}) => {
130130
Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect())
131131
}
132132
_ => Err(py_type_err(

src/expr/aggregate_expr.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl From<AggregateFunction> for PyAggregateFunction {
4040

4141
impl Display for PyAggregateFunction {
4242
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
43-
let args: Vec<String> = self.aggr.args.iter().map(|expr| expr.to_string()).collect();
43+
let args: Vec<String> = self.aggr.params.args.iter().map(|expr| expr.to_string()).collect();
4444
write!(f, "{}({})", self.aggr.func.name(), args.join(", "))
4545
}
4646
}
@@ -54,12 +54,13 @@ impl PyAggregateFunction {
5454

5555
/// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
5656
fn is_distinct(&self) -> bool {
57-
self.aggr.distinct
57+
self.aggr.params.distinct
5858
}
5959

6060
/// Get the arguments to the aggregate function
6161
fn args(&self) -> Vec<PyExpr> {
6262
self.aggr
63+
.params
6364
.args
6465
.iter()
6566
.map(|expr| PyExpr::from(expr.clone()))

src/expr/window.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::{DataFusionError, ScalarValue};
19-
use datafusion::logical_expr::expr::WindowFunction;
19+
use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
2020
use datafusion::logical_expr::{Expr, Window, WindowFrame, WindowFrameBound, WindowFrameUnits};
2121
use pyo3::{prelude::*, IntoPyObjectExt};
2222
use std::fmt::{self, Display, Formatter};
@@ -118,15 +118,15 @@ impl PyWindowExpr {
118118
/// Returns order by columns in a window function expression
119119
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PySortExpr>> {
120120
match expr.expr.unalias() {
121-
Expr::WindowFunction(WindowFunction { order_by, .. }) => py_sort_expr_list(&order_by),
121+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { order_by, .. }, .. }) => py_sort_expr_list(&order_by),
122122
other => Err(not_window_function_err(other)),
123123
}
124124
}
125125

126126
/// Return partition by columns in a window function expression
127127
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
128128
match expr.expr.unalias() {
129-
Expr::WindowFunction(WindowFunction { partition_by, .. }) => {
129+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { partition_by, .. }, .. }) => {
130130
py_expr_list(&partition_by)
131131
}
132132
other => Err(not_window_function_err(other)),
@@ -136,7 +136,7 @@ impl PyWindowExpr {
136136
/// Return input args for window function
137137
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
138138
match expr.expr.unalias() {
139-
Expr::WindowFunction(WindowFunction { args, .. }) => py_expr_list(&args),
139+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => py_expr_list(&args),
140140
other => Err(not_window_function_err(other)),
141141
}
142142
}
@@ -152,7 +152,7 @@ impl PyWindowExpr {
152152
/// Returns a Pywindow frame for a given window function expression
153153
pub fn get_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
154154
match expr.expr.unalias() {
155-
Expr::WindowFunction(WindowFunction { window_frame, .. }) => Some(window_frame.into()),
155+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams {window_frame, .. }, .. }) => Some(window_frame.into()),
156156
_ => None,
157157
}
158158
}

src/functions.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ use crate::expr::sort_expr::to_sort_expressions;
3131
use crate::expr::sort_expr::PySortExpr;
3232
use crate::expr::window::PyWindowFrame;
3333
use crate::expr::PyExpr;
34-
use datafusion::common::{Column, ScalarValue, TableReference};
34+
use datafusion::common::{Column, ScalarValue, Spans, TableReference};
3535
use datafusion::execution::FunctionRegistry;
3636
use datafusion::functions;
3737
use datafusion::functions_aggregate;
3838
use datafusion::functions_window;
39-
use datafusion::logical_expr::expr::Alias;
39+
use datafusion::logical_expr::expr::{Alias, WindowFunctionParams};
4040
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
4141
use datafusion::logical_expr::{expr::WindowFunction, lit, Expr, WindowFunctionDefinition};
4242

@@ -196,10 +196,7 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
196196
#[pyfunction]
197197
fn col(name: &str) -> PyResult<PyExpr> {
198198
Ok(PyExpr {
199-
expr: datafusion::logical_expr::Expr::Column(Column {
200-
relation: None,
201-
name: name.to_string(),
202-
}),
199+
expr: Expr::Column(Column::new_unqualified(name)),
203200
})
204201
}
205202

@@ -314,19 +311,21 @@ fn window(
314311
Ok(PyExpr {
315312
expr: datafusion::logical_expr::Expr::WindowFunction(WindowFunction {
316313
fun,
317-
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
318-
partition_by: partition_by
319-
.unwrap_or_default()
320-
.into_iter()
321-
.map(|x| x.expr)
322-
.collect::<Vec<_>>(),
323-
order_by: order_by
324-
.unwrap_or_default()
325-
.into_iter()
326-
.map(|x| x.into())
327-
.collect::<Vec<_>>(),
328-
window_frame,
329-
null_treatment: None,
314+
params: WindowFunctionParams {
315+
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
316+
partition_by: partition_by
317+
.unwrap_or_default()
318+
.into_iter()
319+
.map(|x| x.expr)
320+
.collect::<Vec<_>>(),
321+
order_by: order_by
322+
.unwrap_or_default()
323+
.into_iter()
324+
.map(|x| x.into())
325+
.collect::<Vec<_>>(),
326+
window_frame,
327+
null_treatment: None,
328+
},
330329
}),
331330
})
332331
}

0 commit comments

Comments
 (0)