Skip to content

Commit a188c50

Browse files
aditanaseadragomir
authored andcommittedMar 11, 2025·
[HSTACK] [DF] - pushdown limit and predicates for partition cols
1 parent ec974bb commit a188c50

File tree

1 file changed

+97
-32
lines changed
  • crates/core/src/delta_datafusion

1 file changed

+97
-32
lines changed
 

‎crates/core/src/delta_datafusion/mod.rs

+97-32
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,14 @@ use datafusion::catalog::{Session, TableProviderFactory};
4141
use datafusion::config::TableParquetOptions;
4242
use datafusion::datasource::memory::DataSourceExec;
4343
use datafusion::datasource::physical_plan::{
44-
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
45-
ParquetSource,
44+
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource,
4645
};
4746
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
4847
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
4948
use datafusion::execution::runtime_env::RuntimeEnv;
5049
use datafusion::execution::FunctionRegistry;
5150
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
52-
use datafusion::physical_optimizer::pruning::PruningPredicate;
51+
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
5352
use datafusion_common::scalar::ScalarValue;
5453
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
5554
use datafusion_common::{
@@ -60,7 +59,9 @@ use datafusion_expr::execution_props::ExecutionProps;
6059
use datafusion_expr::logical_plan::CreateExternalTable;
6160
use datafusion_expr::simplify::SimplifyContext;
6261
use datafusion_expr::utils::conjunction;
63-
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
62+
use datafusion_expr::{
63+
col, BinaryExpr, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility,
64+
};
6465
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
6566
use datafusion_physical_plan::filter::FilterExec;
6667
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
@@ -556,6 +557,10 @@ impl<'a> DeltaScanBuilder<'a> {
556557
Some(schema.clone()),
557558
)?;
558559

560+
// TODO temporarily using full schema to generate pruning predicates
561+
// should we optimize this by only including fields referenced from predicates?
562+
let filter_df_schema = logical_schema.clone().to_dfschema()?;
563+
559564
let logical_schema = if let Some(used_columns) = self.projection {
560565
let mut fields = vec![];
561566
for idx in used_columns {
@@ -567,18 +572,17 @@ impl<'a> DeltaScanBuilder<'a> {
567572
};
568573

569574
let context = SessionContext::new();
570-
let df_schema = logical_schema.clone().to_dfschema()?;
571575

572576
let logical_filter = self.filter.map(|expr| {
573577
// Simplify the expression first
574578
let props = ExecutionProps::new();
575579
let simplify_context =
576-
SimplifyContext::new(&props).with_schema(df_schema.clone().into());
580+
SimplifyContext::new(&props).with_schema(filter_df_schema.clone().into());
577581
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
578582
let simplified = simplifier.simplify(expr).unwrap();
579583

580584
context
581-
.create_physical_expr(simplified, &df_schema)
585+
.create_physical_expr(simplified, &filter_df_schema)
582586
.unwrap()
583587
});
584588

@@ -590,31 +594,64 @@ impl<'a> DeltaScanBuilder<'a> {
590594
(files, files_scanned, 0)
591595
}
592596
None => {
593-
if let Some(predicate) = &logical_filter {
594-
let pruning_predicate =
595-
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
596-
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
597-
let mut files_pruned = 0usize;
598-
let files = self
597+
// early return in case we have no push down filters or limit
598+
if logical_filter.is_none() && self.limit.is_none() {
599+
let files = self.snapshot.file_actions()?;
600+
let files_scanned = files.len();
601+
(files, files_scanned, 0)
602+
} else {
603+
let num_containers = self.snapshot.num_containers();
604+
605+
let files_to_prune = if let Some(predicate) = &logical_filter {
606+
let pruning_predicate =
607+
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
608+
pruning_predicate.prune(self.snapshot)?
609+
} else {
610+
vec![true; num_containers]
611+
};
612+
613+
// needed to enforce limit and deal with missing statistics
614+
// rust port of https://github.com/delta-io/delta/pull/1495
615+
let mut pruned_without_stats = vec![];
616+
let mut rows_collected = 0;
617+
let mut files = vec![];
618+
619+
for (action, keep) in self
599620
.snapshot
600621
.file_actions_iter()?
601622
.zip(files_to_prune.into_iter())
602-
.filter_map(|(action, keep)| {
603-
if keep {
604-
Some(action.to_owned())
623+
{
624+
// prune file based on predicate pushdown
625+
if keep {
626+
// prune file based on limit pushdown
627+
if let Some(limit) = self.limit {
628+
if let Some(stats) = action.get_stats()? {
629+
if rows_collected <= limit as i64 {
630+
rows_collected += stats.num_records;
631+
files.push(action.to_owned());
632+
} else {
633+
break;
634+
}
635+
} else {
636+
// some files are missing stats; skipping but storing them
637+
// in a list in case we can't reach the target limit
638+
pruned_without_stats.push(action.to_owned());
639+
}
605640
} else {
606-
files_pruned += 1;
607-
None
641+
files.push(action.to_owned());
608642
}
609-
})
610-
.collect::<Vec<_>>();
643+
}
644+
}
645+
646+
if let Some(limit) = self.limit {
647+
if rows_collected < limit as i64 {
648+
files.extend(pruned_without_stats);
649+
}
650+
}
611651

612652
let files_scanned = files.len();
653+
let files_pruned = num_containers - files_scanned;
613654
(files, files_scanned, files_pruned)
614-
} else {
615-
let files = self.snapshot.file_actions()?;
616-
let files_scanned = files.len();
617-
(files, files_scanned, 0)
618655
}
619656
}
620657
};
@@ -810,17 +847,47 @@ impl TableProvider for DeltaTable {
810847
&self,
811848
filter: &[&Expr],
812849
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
813-
Ok(filter
814-
.iter()
815-
.map(|_| TableProviderFilterPushDown::Inexact)
816-
.collect())
850+
let partition_cols = self.snapshot()?.metadata().partition_columns.clone();
851+
Ok(get_pushdown_filters(filter, partition_cols))
817852
}
818853

819854
fn statistics(&self) -> Option<Statistics> {
820855
self.snapshot().ok()?.datafusion_table_statistics()
821856
}
822857
}
823858

859+
fn get_pushdown_filters(
860+
filter: &[&Expr],
861+
partition_cols: Vec<String>,
862+
) -> Vec<TableProviderFilterPushDown> {
863+
filter
864+
.iter()
865+
.map(|filter| {
866+
let columns = extract_columns(filter);
867+
if !columns.is_empty() && columns.iter().all(|col| partition_cols.contains(col)) {
868+
TableProviderFilterPushDown::Exact
869+
} else {
870+
TableProviderFilterPushDown::Inexact
871+
}
872+
})
873+
.collect()
874+
}
875+
876+
fn extract_columns(expr: &Expr) -> Vec<String> {
877+
let mut columns = Vec::new();
878+
match expr {
879+
Expr::Column(col) => columns.push(col.name.clone()),
880+
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
881+
let left_columns = extract_columns(left);
882+
let right_columns = extract_columns(right);
883+
columns.extend(left_columns);
884+
columns.extend(right_columns);
885+
}
886+
_ => {}
887+
}
888+
columns
889+
}
890+
824891
/// A Delta table provider that enables additional metadata columns to be included during the scan
825892
#[derive(Debug)]
826893
pub struct DeltaTableProvider {
@@ -926,10 +993,8 @@ impl TableProvider for DeltaTableProvider {
926993
&self,
927994
filter: &[&Expr],
928995
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
929-
Ok(filter
930-
.iter()
931-
.map(|_| TableProviderFilterPushDown::Inexact)
932-
.collect())
996+
let partition_cols = self.snapshot.metadata().partition_columns.clone();
997+
Ok(get_pushdown_filters(filter, partition_cols))
933998
}
934999

9351000
fn statistics(&self) -> Option<Statistics> {

0 commit comments

Comments
 (0)
Please sign in to comment.