Skip to content

Commit a956ac3

Browse files
authored
use state machine to refactor the get_files_with_limit method (#15521)
1 parent e9d2ec9 commit a956ac3

File tree

1 file changed

+34
-38
lines changed
  • datafusion/core/src/datasource/listing

1 file changed

+34
-38
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,47 +1205,43 @@ async fn get_files_with_limit(
12051205
let mut file_group = FileGroup::default();
12061206
// Fusing the stream allows us to call next safely even once it is finished.
12071207
let mut all_files = Box::pin(files.fuse());
1208-
let mut num_rows = Precision::<usize>::Absent;
1209-
while let Some(first_file) = all_files.next().await {
1210-
let file = first_file?;
1211-
if let Some(file_statistic) = &file.statistics {
1212-
num_rows = file_statistic.num_rows;
1208+
enum ProcessingState {
1209+
ReadingFiles,
1210+
ReachedLimit,
1211+
}
1212+
1213+
let mut state = ProcessingState::ReadingFiles;
1214+
let mut num_rows = Precision::Absent;
1215+
1216+
while let Some(file_result) = all_files.next().await {
1217+
// Early exit if we've already reached our limit
1218+
if matches!(state, ProcessingState::ReachedLimit) {
1219+
break;
12131220
}
1214-
file_group.push(file);
12151221

1216-
// If the number of rows exceeds the limit, we can stop processing
1217-
// files. This only applies when we know the number of rows. It also
1218-
// currently ignores tables that have no statistics regarding the
1219-
// number of rows.
1220-
let conservative_num_rows = match num_rows {
1221-
Precision::Exact(nr) => nr,
1222-
_ => usize::MIN,
1223-
};
1224-
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
1225-
while let Some(current) = all_files.next().await {
1226-
let file = current?;
1227-
if !collect_stats {
1228-
file_group.push(file);
1229-
continue;
1230-
}
1222+
let file = file_result?;
12311223

1232-
// We accumulate the number of rows, total byte size and null
1233-
// counts across all the files in question. If any file does not
1234-
// provide any information or provides an inexact value, we demote
1235-
// the statistic precision to inexact.
1236-
if let Some(file_stats) = &file.statistics {
1237-
num_rows = add_row_stats(num_rows, file_stats.num_rows);
1238-
}
1239-
file_group.push(file);
1240-
1241-
// If the number of rows exceeds the limit, we can stop processing
1242-
// files. This only applies when we know the number of rows. It also
1243-
// currently ignores tables that have no statistics regarding the
1244-
// number of rows.
1245-
if num_rows.get_value().unwrap_or(&usize::MIN)
1246-
> &limit.unwrap_or(usize::MAX)
1247-
{
1248-
break;
1224+
// Update file statistics regardless of state
1225+
if collect_stats {
1226+
if let Some(file_stats) = &file.statistics {
1227+
num_rows = if file_group.is_empty() {
1228+
// For the first file, just take its row count
1229+
file_stats.num_rows
1230+
} else {
1231+
// For subsequent files, accumulate the counts
1232+
add_row_stats(num_rows, file_stats.num_rows)
1233+
};
1234+
}
1235+
}
1236+
1237+
// Always add the file to our group
1238+
file_group.push(file);
1239+
1240+
// Check if we've hit the limit (if one was specified)
1241+
if let Some(limit) = limit {
1242+
if let Precision::Exact(row_count) = num_rows {
1243+
if row_count > limit {
1244+
state = ProcessingState::ReachedLimit;
12491245
}
12501246
}
12511247
}

0 commit comments

Comments
 (0)