Skip to content

Commit 14000a2

Browse files
committed
chore: upgrade to Datafusion 38
1 parent 353e08b commit 14000a2

File tree

7 files changed

+39
-42
lines changed

7 files changed

+39
-42
lines changed

Cargo.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ object_store = { version = "0.9" }
4242
parquet = { version = "51" }
4343

4444
# datafusion
45-
datafusion = { version = "37.1" }
46-
datafusion-expr = { version = "37.1" }
47-
datafusion-common = { version = "37.1" }
48-
datafusion-proto = { version = "37.1" }
49-
datafusion-sql = { version = "37.1" }
50-
datafusion-physical-expr = { version = "37.1" }
51-
datafusion-functions = { version = "37.1" }
52-
datafusion-functions-array = { version = "37.1" }
45+
datafusion = { version = "38.0" }
46+
datafusion-expr = { version = "38.0" }
47+
datafusion-common = { version = "38.0" }
48+
datafusion-proto = { version = "38.0" }
49+
datafusion-sql = { version = "38.0" }
50+
datafusion-physical-expr = { version = "38.0" }
51+
datafusion-functions = { version = "38.0" }
52+
datafusion-functions-array = { version = "38.0" }
5353

5454
# serde
5555
serde = { version = "1.0.194", features = ["derive"] }

crates/core/src/delta_datafusion/cdf/scan_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub fn create_partition_values<F: FileAction>(
8080
last_modified: chrono::Utc.timestamp_nanos(0),
8181
version: None,
8282
},
83+
statistics: None,
8384
partition_values: new_part_values.clone(),
8485
extensions: None,
8586
range: None,

crates/core/src/delta_datafusion/expr.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,11 @@ mod test {
429429
use datafusion_common::{Column, ScalarValue, ToDFSchema};
430430
use datafusion_expr::expr::ScalarFunction;
431431
use datafusion_expr::{
432-
col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
432+
col, lit, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
433433
};
434434
use datafusion_functions::core::arrow_cast;
435435
use datafusion_functions::encoding::expr_fn::decode;
436+
use datafusion_functions::expr_fn::substring;
436437
use datafusion_functions_array::expr_fn::cardinality;
437438

438439
use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};

crates/core/src/delta_datafusion/mod.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -698,11 +698,11 @@ impl TableProvider for DeltaTable {
698698
Ok(Arc::new(scan))
699699
}
700700

701-
fn supports_filter_pushdown(
701+
fn supports_filters_pushdown(
702702
&self,
703-
_filter: &Expr,
704-
) -> DataFusionResult<TableProviderFilterPushDown> {
705-
Ok(TableProviderFilterPushDown::Inexact)
703+
_filter: &[&Expr],
704+
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
705+
Ok(vec![TableProviderFilterPushDown::Inexact])
706706
}
707707

708708
fn statistics(&self) -> Option<Statistics> {
@@ -777,11 +777,11 @@ impl TableProvider for DeltaTableProvider {
777777
Ok(Arc::new(scan))
778778
}
779779

780-
fn supports_filter_pushdown(
780+
fn supports_filters_pushdown(
781781
&self,
782-
_filter: &Expr,
783-
) -> DataFusionResult<TableProviderFilterPushDown> {
784-
Ok(TableProviderFilterPushDown::Inexact)
782+
_filter: &[&Expr],
783+
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
784+
Ok(vec![TableProviderFilterPushDown::Inexact])
785785
}
786786

787787
fn statistics(&self) -> Option<Statistics> {
@@ -989,6 +989,7 @@ pub(crate) fn partitioned_file_from_action(
989989
..action.try_into().unwrap()
990990
},
991991
partition_values,
992+
statistics: None,
992993
range: None,
993994
extensions: None,
994995
}
@@ -1425,14 +1426,7 @@ impl TreeNodeVisitor for FindFilesExprProperties {
14251426
| Expr::TryCast(_) => (),
14261427
Expr::ScalarFunction(ScalarFunction { func_def, .. }) => {
14271428
let v = match func_def {
1428-
datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(),
14291429
datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility,
1430-
datafusion_expr::ScalarFunctionDefinition::Name(n) => {
1431-
self.result = Err(DeltaTableError::Generic(format!(
1432-
"Cannot determine volatility of find files predicate function {n}",
1433-
)));
1434-
return Ok(TreeNodeRecursion::Stop);
1435-
}
14361430
};
14371431
if v > Volatility::Immutable {
14381432
self.result = Err(DeltaTableError::Generic(format!(
@@ -1900,6 +1894,7 @@ mod tests {
19001894
version: None,
19011895
},
19021896
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
1897+
statistics: None,
19031898
range: None,
19041899
extensions: None,
19051900
};

crates/core/src/operations/merge/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ impl MergeOperation {
502502
relation: Some(TableReference::Bare { table }),
503503
name,
504504
} => {
505-
if table.eq(alias) {
505+
if table.as_ref() == alias {
506506
Column {
507507
relation: Some(r),
508508
name,
@@ -863,8 +863,8 @@ async fn try_construct_early_filter(
863863
table_snapshot: &DeltaTableState,
864864
session_state: &SessionState,
865865
source: &LogicalPlan,
866-
source_name: &TableReference<'_>,
867-
target_name: &TableReference<'_>,
866+
source_name: &TableReference,
867+
target_name: &TableReference,
868868
) -> DeltaResult<Option<Expr>> {
869869
let table_metadata = table_snapshot.metadata();
870870
let partition_columns = &table_metadata.partition_columns;
@@ -1324,9 +1324,9 @@ async fn execute(
13241324
let plan = projection.into_unoptimized_plan();
13251325
let mut fields: Vec<Expr> = plan
13261326
.schema()
1327-
.fields()
1327+
.columns()
13281328
.iter()
1329-
.map(|f| col(f.qualified_column()))
1329+
.map(|f| col(f.clone()))
13301330
.collect();
13311331

13321332
fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name)));

crates/sql/src/logical_plan.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::fmt::{self, Debug, Display};
22
use std::sync::Arc;
33

4-
use datafusion_common::{DFSchema, DFSchemaRef, OwnedTableReference};
4+
use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
55
use datafusion_expr::logical_plan::LogicalPlan;
66
use datafusion_expr::{Expr, UserDefinedLogicalNodeCore};
77

@@ -107,7 +107,7 @@ impl UserDefinedLogicalNodeCore for DeltaStatement {
107107
#[derive(Clone, PartialEq, Eq, Hash)]
108108
pub struct Vacuum {
109109
/// A reference to the table being vacuumed
110-
pub table: OwnedTableReference,
110+
pub table: TableReference,
111111
/// The retention threshold.
112112
pub retention_hours: Option<i32>,
113113
/// Return a list of up to 1000 files to be deleted.
@@ -117,7 +117,7 @@ pub struct Vacuum {
117117
}
118118

119119
impl Vacuum {
120-
pub fn new(table: OwnedTableReference, retention_hours: Option<i32>, dry_run: bool) -> Self {
120+
pub fn new(table: TableReference, retention_hours: Option<i32>, dry_run: bool) -> Self {
121121
Self {
122122
table,
123123
retention_hours,
@@ -133,13 +133,13 @@ impl Vacuum {
133133
#[derive(Clone, PartialEq, Eq, Hash)]
134134
pub struct DescribeHistory {
135135
/// A reference to the table
136-
pub table: OwnedTableReference,
136+
pub table: TableReference,
137137
/// Schema for commit provenence information
138138
pub schema: DFSchemaRef,
139139
}
140140

141141
impl DescribeHistory {
142-
pub fn new(table: OwnedTableReference) -> Self {
142+
pub fn new(table: TableReference) -> Self {
143143
Self {
144144
table,
145145
// TODO: add proper schema
@@ -153,13 +153,13 @@ impl DescribeHistory {
153153
#[derive(Clone, PartialEq, Eq, Hash)]
154154
pub struct DescribeDetails {
155155
/// A reference to the table
156-
pub table: OwnedTableReference,
156+
pub table: TableReference,
157157
/// Schema for commit provenence information
158158
pub schema: DFSchemaRef,
159159
}
160160

161161
impl DescribeDetails {
162-
pub fn new(table: OwnedTableReference) -> Self {
162+
pub fn new(table: TableReference) -> Self {
163163
Self {
164164
table,
165165
// TODO: add proper schema
@@ -172,13 +172,13 @@ impl DescribeDetails {
172172
#[derive(Clone, PartialEq, Eq, Hash)]
173173
pub struct DescribeFiles {
174174
/// A reference to the table
175-
pub table: OwnedTableReference,
175+
pub table: TableReference,
176176
/// Schema for commit provenence information
177177
pub schema: DFSchemaRef,
178178
}
179179

180180
impl DescribeFiles {
181-
pub fn new(table: OwnedTableReference) -> Self {
181+
pub fn new(table: TableReference) -> Self {
182182
Self {
183183
table,
184184
// TODO: add proper schema

crates/sql/src/planner.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::Arc;
22

3-
use datafusion_common::{OwnedTableReference, Result as DFResult};
3+
use datafusion_common::{TableReference, Result as DFResult};
44
use datafusion_expr::logical_plan::{Extension, LogicalPlan};
55
use datafusion_sql::planner::{
66
object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel,
@@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
5454
fn vacuum_to_plan(&self, vacuum: VacuumStatement) -> DFResult<LogicalPlan> {
5555
let table_ref = self.object_name_to_table_reference(vacuum.table)?;
5656
let plan = DeltaStatement::Vacuum(Vacuum::new(
57-
table_ref.to_owned_reference(),
57+
table_ref.clone(),
5858
vacuum.retention_hours,
5959
vacuum.dry_run,
6060
));
@@ -66,7 +66,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
6666
fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult<LogicalPlan> {
6767
let table_ref = self.object_name_to_table_reference(describe.table)?;
6868
let plan =
69-
DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.to_owned_reference()));
69+
DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone()));
7070
Ok(LogicalPlan::Extension(Extension {
7171
node: Arc::new(plan),
7272
}))
@@ -75,7 +75,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
7575
pub(crate) fn object_name_to_table_reference(
7676
&self,
7777
object_name: ObjectName,
78-
) -> DFResult<OwnedTableReference> {
78+
) -> DFResult<TableReference> {
7979
object_name_to_table_reference(object_name, self.options.enable_ident_normalization)
8080
}
8181
}

0 commit comments

Comments
 (0)