Skip to content

use state machine to refactor the get_files_with_limit method #15521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 1, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,47 +1205,43 @@ async fn get_files_with_limit(
let mut file_group = FileGroup::default();
// Fusing the stream allows us to call next safely even once it is finished.
let mut all_files = Box::pin(files.fuse());
let mut num_rows = Precision::<usize>::Absent;
while let Some(first_file) = all_files.next().await {
let file = first_file?;
if let Some(file_statistic) = &file.statistics {
num_rows = file_statistic.num_rows;
enum ProcessingState {
ReadingFiles,
ReachedLimit,
}

let mut state = ProcessingState::ReadingFiles;
let mut num_rows = Precision::Absent;

while let Some(file_result) = all_files.next().await {
// Early exit if we've already reached our limit
if matches!(state, ProcessingState::ReachedLimit) {
break;
}
file_group.push(file);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
let conservative_num_rows = match num_rows {
Precision::Exact(nr) => nr,
_ => usize::MIN,
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let file = current?;
if !collect_stats {
file_group.push(file);
continue;
}
let file = file_result?;

// We accumulate the number of rows, total byte size and null
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
if let Some(file_stats) = &file.statistics {
num_rows = add_row_stats(num_rows, file_stats.num_rows);
}
file_group.push(file);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
if num_rows.get_value().unwrap_or(&usize::MIN)
> &limit.unwrap_or(usize::MAX)
{
break;
// Update file statistics regardless of state
if collect_stats {
if let Some(file_stats) = &file.statistics {
num_rows = if file_group.is_empty() {
// For the first file, just take its row count
file_stats.num_rows
} else {
// For subsequent files, accumulate the counts
add_row_stats(num_rows, file_stats.num_rows)
};
}
}

// Always add the file to our group
file_group.push(file);

// Check if we've hit the limit (if one was specified)
if let Some(limit) = limit {
if let Precision::Exact(row_count) = num_rows {
if row_count > limit {
state = ProcessingState::ReachedLimit;
}
}
}
Expand Down