Skip to content

Commit e2d7dfe

Browse files
committed
[HSTACK] [DF] - predicate pushdown EXACT for partition columns
1 parent 9f9ddbb commit e2d7dfe

File tree

1 file changed

+40
-8
lines changed
  • crates/core/src/delta_datafusion

1 file changed

+40
-8
lines changed

crates/core/src/delta_datafusion/mod.rs

+40-8
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ impl<'a> DeltaScanBuilder<'a> {
556556
Some(schema.clone()),
557557
)?;
558558

559+
// TODO temporarily using full schema to generate pruning predicates
560+
// should we optimize this by only including fields referenced from predicates?
561+
let filter_df_schema = logical_schema.clone().to_dfschema()?;
562+
559563
let logical_schema = if let Some(used_columns) = self.projection {
560564
let mut fields = vec![];
561565
for idx in used_columns {
@@ -810,17 +814,47 @@ impl TableProvider for DeltaTable {
810814
&self,
811815
filter: &[&Expr],
812816
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
813-
Ok(filter
814-
.iter()
815-
.map(|_| TableProviderFilterPushDown::Inexact)
816-
.collect())
817+
let partition_cols = self.snapshot()?.metadata().partition_columns.clone();
818+
Ok(get_pushdown_filters(filter, partition_cols))
817819
}
818820

819821
fn statistics(&self) -> Option<Statistics> {
820822
self.snapshot().ok()?.datafusion_table_statistics()
821823
}
822824
}
823825

826+
fn get_pushdown_filters(
827+
filter: &[&Expr],
828+
partition_cols: Vec<String>,
829+
) -> Vec<TableProviderFilterPushDown> {
830+
filter
831+
.iter()
832+
.map(|filter| {
833+
let columns = extract_columns(filter);
834+
if !columns.is_empty() && columns.iter().all(|col| partition_cols.contains(col)) {
835+
TableProviderFilterPushDown::Exact
836+
} else {
837+
TableProviderFilterPushDown::Inexact
838+
}
839+
})
840+
.collect()
841+
}
842+
843+
fn extract_columns(expr: &Expr) -> Vec<String> {
844+
let mut columns = Vec::new();
845+
match expr {
846+
Expr::Column(col) => columns.push(col.name.clone()),
847+
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
848+
let left_columns = extract_columns(left);
849+
let right_columns = extract_columns(right);
850+
columns.extend(left_columns);
851+
columns.extend(right_columns);
852+
}
853+
_ => {}
854+
}
855+
columns
856+
}
857+
824858
/// A Delta table provider that enables additional metadata columns to be included during the scan
825859
#[derive(Debug)]
826860
pub struct DeltaTableProvider {
@@ -926,10 +960,8 @@ impl TableProvider for DeltaTableProvider {
926960
&self,
927961
filter: &[&Expr],
928962
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
929-
Ok(filter
930-
.iter()
931-
.map(|_| TableProviderFilterPushDown::Inexact)
932-
.collect())
963+
let partition_cols = self.snapshot.metadata().partition_columns.clone();
964+
Ok(get_pushdown_filters(filter, partition_cols))
933965
}
934966

935967
fn statistics(&self) -> Option<Statistics> {

0 commit comments

Comments
 (0)