Skip to content

Commit 3ef3dbb

Browse files
committed
DataFusion 46.0.0
1 parent 2b72f90 commit 3ef3dbb

File tree

11 files changed

+1558
-442
lines changed

11 files changed

+1558
-442
lines changed

Cargo.lock

Lines changed: 1473 additions & 385 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
[package]
1919
name = "datafusion-python"
20-
version = "45.2.0"
20+
version = "46.0.0"
2121
homepage = "https://datafusion.apache.org/python"
2222
repository = "https://github.com/apache/datafusion-python"
2323
authors = ["Apache DataFusion <[email protected]>"]
@@ -38,18 +38,18 @@ tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread", "sync
3838
pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py38"] }
3939
pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"]}
4040
arrow = { version = "54", features = ["pyarrow"] }
41-
datafusion = { version = "45.0.0", features = ["avro", "unicode_expressions"] }
42-
datafusion-substrait = { version = "45.0.0", optional = true }
43-
datafusion-proto = { version = "45.0.0" }
44-
datafusion-ffi = { version = "45.0.0" }
41+
datafusion = { version = "46.0.0", features = ["avro", "unicode_expressions"] }
42+
datafusion-substrait = { version = "46.0.0", optional = true }
43+
datafusion-proto = { version = "46.0.0" }
44+
datafusion-ffi = { version = "46.0.0" }
4545
prost = "0.13" # keep in line with `datafusion-substrait`
4646
uuid = { version = "1.12", features = ["v4"] }
4747
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4848
async-trait = "0.1"
4949
futures = "0.3"
5050
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
5151
url = "2"
52-
deltalake = { version = "0.23", features = ["datafusion", "azure", "s3"] }
52+
deltalake = { version = "0.25.0", features = ["datafusion", "azure", "s3"] }
5353

5454

5555
[build-dependencies]

Dockerfile-build-wheel.arm64

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,19 @@ WORKDIR /root
66

77
RUN <<EOF
88
dnf install -y epel-release
9-
dnf install -y curl pkg-config openssl ca-certificates openssl-devel patchelf autoconf automake make libtool unzip clang libatomic protobuf-c-compiler openssh-clients
9+
dnf install -y curl pkg-config openssl ca-certificates openssl-devel patchelf autoconf automake make libtool unzip clang libatomic openssh-clients wget
10+
EOF
11+
12+
RUN <<EOF
13+
# echo 1
14+
wget https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.21.12.zip
15+
unzip v3.21.12.zip
16+
cd protobuf-3.21.12
17+
autoupdate
18+
./autogen.sh
19+
./configure --prefix=/usr --disable-dependency-tracking
20+
make -j8
21+
make install
1022
EOF
1123

1224
RUN <<EOF

Dockerfile-build-wheel.x86_64

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,19 @@ WORKDIR /root
66

77
RUN <<EOF
88
dnf install -y epel-release
9-
dnf install -y curl pkg-config openssl ca-certificates openssl-devel patchelf autoconf automake make libtool unzip clang libatomic protobuf-c-compiler openssh-clients
9+
dnf install -y curl pkg-config openssl ca-certificates openssl-devel patchelf autoconf automake make libtool unzip clang libatomic openssh-clients wget
10+
EOF
11+
12+
RUN <<EOF
13+
# echo 1
14+
wget https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.21.12.zip
15+
unzip v3.21.12.zip
16+
cd protobuf-3.21.12
17+
autoupdate
18+
./autogen.sh
19+
./configure --prefix=/usr --disable-dependency-tracking
20+
make -j8
21+
make install
1022
EOF
1123

1224
RUN <<EOF

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ classifiers = [
4646
]
4747
dependencies = ["pyarrow>=11.0.0", "typing-extensions;python_version<'3.13'"]
4848
#dynamic = ["version"]
49-
version = "44.0.0+adobe.1"
49+
version = "46.0.0+adobe.1"
5050

5151
[project.urls]
5252
homepage = "https://datafusion.apache.org/python"

src/dataframe.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ use datafusion::arrow::util::pretty;
3030
use datafusion::common::stats::Precision;
3131
use datafusion::common::{DFSchema, DataFusionError, Statistics, UnnestOptions};
3232
use datafusion::common::tree_node::{Transformed, TreeNode};
33-
use datafusion::config::{ConfigOptions, CsvOptions, TableParquetOptions};
33+
use datafusion::config::{CsvOptions, TableParquetOptions};
3434
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
35+
use datafusion::datasource::memory::DataSourceExec;
3536
use datafusion::datasource::TableProvider;
36-
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
37-
use datafusion::datasource::physical_plan::ParquetExec;
37+
use datafusion::datasource::physical_plan::FileScanConfig;
38+
use datafusion::datasource::source::DataSource;
3839
use datafusion::execution::SendableRecordBatchStream;
3940
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
4041
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -753,25 +754,28 @@ impl DistributedPlan {
753754
return Ok(())
754755
}
755756
let updated_plan = self.plan().clone().transform_up(|node| {
756-
if let Some(parquet) = node.as_any().downcast_ref::<ParquetExec>() {
757+
if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
757758
// Remove redundant ranges from partition files because ParquetExec refuses to repartition
758759
// if any file has a range defined (even when the range actually covers the entire file).
759760
// The EnforceDistribution optimizer rule adds ranges for both full and partial files,
760761
// so this tries to rever that to trigger a repartition when no files are actually split.
761-
let mut file_groups = parquet.base_config().file_groups.clone();
762-
for group in file_groups.iter_mut() {
763-
for file in group.iter_mut() {
764-
if let Some(range) = &file.range {
765-
if range.start == 0 && range.end == file.object_meta.size as i64 {
766-
file.range = None; // remove redundant range
762+
if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>() {
763+
let mut file_groups = file_scan.file_groups.clone();
764+
for group in file_groups.iter_mut() {
765+
for file in group.iter_mut() {
766+
if let Some(range) = &file.range {
767+
if range.start == 0 && range.end == file.object_meta.size as i64 {
768+
file.range = None; // remove redundant range
769+
}
767770
}
768771
}
769772
}
770-
}
771-
if let Some(repartitioned) = parquet.clone().into_builder().with_file_groups(file_groups)
772-
.build_arc()
773-
.repartitioned(desired_parallelism, &ConfigOptions::default())? {
774-
Ok(Transformed::yes(repartitioned))
773+
if let Some(repartitioned) = file_scan.clone().with_file_groups(file_groups)
774+
.repartitioned(desired_parallelism, 10 * 1024 * 1024, None)? {
775+
Ok(Transformed::yes(Arc::new(DataSourceExec::new(repartitioned))))
776+
} else {
777+
Ok(Transformed::no(node))
778+
}
775779
} else {
776780
Ok(Transformed::no(node))
777781
}

src/expr.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ use datafusion::arrow::pyarrow::PyArrowType;
3030
use datafusion::functions::core::expr_ext::FieldAccessor;
3131
use datafusion::logical_expr::{
3232
col,
33-
expr::{AggregateFunction, InList, InSubquery, ScalarFunction, WindowFunction},
33+
expr::{AggregateFunction, AggregateFunctionParams, InList, InSubquery, ScalarFunction, WindowFunction},
3434
lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast,
3535
};
36-
36+
use datafusion::logical_expr::expr::WindowFunctionParams;
3737
use crate::common::data_type::{DataTypeMap, NullTreatment, PyScalarValue, RexType};
3838
use crate::errors::{
3939
py_runtime_err, py_type_err, py_unsupported_variant_err, PyDataFusionError, PyDataFusionResult,
@@ -394,9 +394,9 @@ impl PyExpr {
394394
| Expr::InSubquery(InSubquery { expr, .. }) => Ok(vec![PyExpr::from(*expr.clone())]),
395395

396396
// Expr variants containing a collection of Expr(s) for operands
397-
Expr::AggregateFunction(AggregateFunction { args, .. })
397+
Expr::AggregateFunction(AggregateFunction { params: AggregateFunctionParams { args, .. }, .. })
398398
| Expr::ScalarFunction(ScalarFunction { args, .. })
399-
| Expr::WindowFunction(WindowFunction { args, .. }) => {
399+
| Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => {
400400
Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
401401
}
402402

@@ -575,7 +575,7 @@ impl PyExpr {
575575
Expr::AggregateFunction(agg_fn) => {
576576
let window_fn = Expr::WindowFunction(WindowFunction::new(
577577
WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
578-
agg_fn.args.clone(),
578+
agg_fn.params.args.clone(),
579579
));
580580

581581
add_builder_fns_to_window(

src/expr/aggregate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::DataFusionError;
19-
use datafusion::logical_expr::expr::{AggregateFunction, Alias};
19+
use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
2020
use datafusion::logical_expr::logical_plan::Aggregate;
2121
use datafusion::logical_expr::Expr;
2222
use pyo3::{prelude::*, IntoPyObjectExt};
@@ -126,7 +126,7 @@ impl PyAggregate {
126126
match expr {
127127
// TODO: This Alias logic seems to be returning some strange results that we should investigate
128128
Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
129-
Expr::AggregateFunction(AggregateFunction { func: _, args, .. }) => {
129+
Expr::AggregateFunction(AggregateFunction { func: _, params: AggregateFunctionParams { args, .. }}) => {
130130
Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect())
131131
}
132132
_ => Err(py_type_err(

src/expr/aggregate_expr.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl From<AggregateFunction> for PyAggregateFunction {
4040

4141
impl Display for PyAggregateFunction {
4242
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
43-
let args: Vec<String> = self.aggr.args.iter().map(|expr| expr.to_string()).collect();
43+
let args: Vec<String> = self.aggr.params.args.iter().map(|expr| expr.to_string()).collect();
4444
write!(f, "{}({})", self.aggr.func.name(), args.join(", "))
4545
}
4646
}
@@ -54,12 +54,13 @@ impl PyAggregateFunction {
5454

5555
/// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
5656
fn is_distinct(&self) -> bool {
57-
self.aggr.distinct
57+
self.aggr.params.distinct
5858
}
5959

6060
/// Get the arguments to the aggregate function
6161
fn args(&self) -> Vec<PyExpr> {
6262
self.aggr
63+
.params
6364
.args
6465
.iter()
6566
.map(|expr| PyExpr::from(expr.clone()))

src/expr/window.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::{DataFusionError, ScalarValue};
19-
use datafusion::logical_expr::expr::WindowFunction;
19+
use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
2020
use datafusion::logical_expr::{Expr, Window, WindowFrame, WindowFrameBound, WindowFrameUnits};
2121
use pyo3::{prelude::*, IntoPyObjectExt};
2222
use std::fmt::{self, Display, Formatter};
@@ -118,15 +118,15 @@ impl PyWindowExpr {
118118
/// Returns order by columns in a window function expression
119119
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PySortExpr>> {
120120
match expr.expr.unalias() {
121-
Expr::WindowFunction(WindowFunction { order_by, .. }) => py_sort_expr_list(&order_by),
121+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { order_by, .. }, .. }) => py_sort_expr_list(&order_by),
122122
other => Err(not_window_function_err(other)),
123123
}
124124
}
125125

126126
/// Return partition by columns in a window function expression
127127
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
128128
match expr.expr.unalias() {
129-
Expr::WindowFunction(WindowFunction { partition_by, .. }) => {
129+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { partition_by, .. }, .. }) => {
130130
py_expr_list(&partition_by)
131131
}
132132
other => Err(not_window_function_err(other)),
@@ -136,7 +136,7 @@ impl PyWindowExpr {
136136
/// Return input args for window function
137137
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
138138
match expr.expr.unalias() {
139-
Expr::WindowFunction(WindowFunction { args, .. }) => py_expr_list(&args),
139+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => py_expr_list(&args),
140140
other => Err(not_window_function_err(other)),
141141
}
142142
}
@@ -152,7 +152,7 @@ impl PyWindowExpr {
152152
/// Returns a Pywindow frame for a given window function expression
153153
pub fn get_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
154154
match expr.expr.unalias() {
155-
Expr::WindowFunction(WindowFunction { window_frame, .. }) => Some(window_frame.into()),
155+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams {window_frame, .. }, .. }) => Some(window_frame.into()),
156156
_ => None,
157157
}
158158
}

0 commit comments

Comments
 (0)