Skip to content

Commit 3e03c2a

Browse files
committed
[HSTACK] [DF] - prune files scanned based on pushed down limit
1 parent e2d7dfe commit 3e03c2a

File tree

1 file changed

+50
-17
lines changed
  • crates/core/src/delta_datafusion

1 file changed

+50
-17
lines changed

crates/core/src/delta_datafusion/mod.rs

+50-17
Original file line numberDiff line numberDiff line change
@@ -594,31 +594,64 @@ impl<'a> DeltaScanBuilder<'a> {
594594
(files, files_scanned, 0)
595595
}
596596
None => {
597-
if let Some(predicate) = &logical_filter {
598-
let pruning_predicate =
599-
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
600-
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
601-
let mut files_pruned = 0usize;
602-
let files = self
597+
// early return in case we have no push down filters or limit
598+
if logical_filter.is_none() && self.limit.is_none() {
599+
let files = self.snapshot.file_actions()?;
600+
let files_scanned = files.len();
601+
(files, files_scanned, 0)
602+
} else {
603+
let num_containers = self.snapshot.num_containers();
604+
605+
let files_to_prune = if let Some(predicate) = &logical_filter {
606+
let pruning_predicate =
607+
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
608+
pruning_predicate.prune(self.snapshot)?
609+
} else {
610+
vec![true; num_containers]
611+
};
612+
613+
// needed to enforce limit and deal with missing statistics
614+
// rust port of https://github.com/delta-io/delta/pull/1495
615+
let mut pruned_without_stats = vec![];
616+
let mut rows_collected = 0;
617+
let mut files = vec![];
618+
619+
for (action, keep) in self
603620
.snapshot
604621
.file_actions_iter()?
605622
.zip(files_to_prune.into_iter())
606-
.filter_map(|(action, keep)| {
607-
if keep {
608-
Some(action.to_owned())
623+
{
624+
// prune file based on predicate pushdown
625+
if keep {
626+
// prune file based on limit pushdown
627+
if let Some(limit) = self.limit {
628+
if let Some(stats) = action.get_stats()? {
629+
if rows_collected <= limit as i64 {
630+
rows_collected += stats.num_records;
631+
files.push(action.to_owned());
632+
} else {
633+
break;
634+
}
635+
} else {
636+
// some files are missing stats; skipping but storing them
637+
// in a list in case we can't reach the target limit
638+
pruned_without_stats.push(action.to_owned());
639+
}
609640
} else {
610-
files_pruned += 1;
611-
None
641+
files.push(action.to_owned());
612642
}
613-
})
614-
.collect::<Vec<_>>();
643+
}
644+
}
645+
646+
if let Some(limit) = self.limit {
647+
if rows_collected < limit as i64 {
648+
files.extend(pruned_without_stats);
649+
}
650+
}
615651

616652
let files_scanned = files.len();
653+
let files_pruned = num_containers - files_scanned;
617654
(files, files_scanned, files_pruned)
618-
} else {
619-
let files = self.snapshot.file_actions()?;
620-
let files_scanned = files.len();
621-
(files, files_scanned, 0)
622655
}
623656
}
624657
};

0 commit comments

Comments
 (0)