Skip to content

Feat: introduce ExecutionPlan::partition_statistics API #15852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ mod tests {
assert_eq!(tt_batches, 50 /* 100/2 */);

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Absent
);

Ok(())
}
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ mod tests {
assert_eq!(tt_batches, 6 /* 12/2 */);

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Absent
);

Ok(())
}
Expand Down
20 changes: 16 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,15 @@ mod tests {
assert_eq!(tt_batches, 4 /* 8/2 */);

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(
exec.partition_statistics(None)?.num_rows,
Precision::Exact(8)
);
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
);

Ok(())
}
Expand Down Expand Up @@ -659,9 +665,15 @@ mod tests {
get_exec(&state, "alltypes_plain.parquet", projection, Some(1)).await?;

// note: even if the limit is set, the executor rounds up to the batch size
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(
exec.partition_statistics(None)?.num_rows,
Precision::Exact(8)
);
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
);
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
Expand Down
47 changes: 32 additions & 15 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ impl ListingOptions {
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
/// `file_schema` contains only the columns physically stored in the data files themselves.
Expand Down Expand Up @@ -1149,23 +1149,25 @@ impl ListingTable {
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let mut file_groups = file_group.split_files(self.options.target_partitions);
let file_groups = file_group.split_files(self.options.target_partitions);
let (mut file_groups, mut stats) = compute_all_files_statistics(
file_groups,
self.schema(),
self.options.collect_stat,
inexact_stats,
)?;
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema())
.map_schema(self.file_schema.as_ref())?;
// Use schema_mapper to map each file-level column statistics to table-level column statistics
stats.column_statistics =
schema_mapper.map_column_statistics(&stats.column_statistics)?;
file_groups.iter_mut().try_for_each(|file_group| {
if let Some(stat) = file_group.statistics_mut() {
stat.column_statistics =
schema_mapper.map_column_statistics(&stat.column_statistics)?;
}
Ok::<_, DataFusionError>(())
})?;
compute_all_files_statistics(
file_groups,
self.schema(),
self.options.collect_stat,
inexact_stats,
)
Ok((file_groups, stats))
}

/// Collects statistics for a given partitioned file.
Expand Down Expand Up @@ -1324,8 +1326,14 @@ mod tests {
assert_eq!(exec.output_partitioning().partition_count(), 1);

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
assert_eq!(
exec.partition_statistics(None)?.num_rows,
Precision::Exact(8)
);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
);

Ok(())
}
Expand All @@ -1350,9 +1358,15 @@ mod tests {
let table = ListingTable::try_new(config)?;

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(
exec.partition_statistics(None)?.num_rows,
Precision::Exact(8)
);
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Exact(671)
);

Ok(())
}
Expand All @@ -1378,8 +1392,11 @@ mod tests {
let table = ListingTable::try_new(config)?;

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
Precision::Absent
);

Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
self.inner.statistics()
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.inner.partition_statistics(partition)
}

fn fetch(&self) -> Option<usize> {
self.inner.fetch()
}
Expand Down
7 changes: 7 additions & 0 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ impl ExecutionPlan for CustomExecutionPlan {
}

fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema()));
}
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
Ok(Statistics {
num_rows: Precision::Exact(batch.num_rows()),
Expand Down
18 changes: 13 additions & 5 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ impl ExecutionPlan for StatisticsValidation {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
Ok(Statistics::new_unknown(&self.schema))
} else {
Ok(self.stats.clone())
}
}
}

fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {
Expand Down Expand Up @@ -232,7 +240,7 @@ async fn sql_basic() -> Result<()> {
let physical_plan = df.create_physical_plan().await.unwrap();

// the statistics should be those of the source
assert_eq!(stats, physical_plan.statistics()?);
assert_eq!(stats, physical_plan.partition_statistics(None)?);

Ok(())
}
Expand All @@ -248,7 +256,7 @@ async fn sql_filter() -> Result<()> {
.unwrap();

let physical_plan = df.create_physical_plan().await.unwrap();
let stats = physical_plan.statistics()?;
let stats = physical_plan.partition_statistics(None)?;
assert_eq!(stats.num_rows, Precision::Inexact(1));

Ok(())
Expand All @@ -270,7 +278,7 @@ async fn sql_limit() -> Result<()> {
column_statistics: col_stats,
total_byte_size: Precision::Absent
},
physical_plan.statistics()?
physical_plan.partition_statistics(None)?
);

let df = ctx
Expand All @@ -279,7 +287,7 @@ async fn sql_limit() -> Result<()> {
.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();
// when the limit is larger than the original number of lines, statistics remain unchanged
assert_eq!(stats, physical_plan.statistics()?);
assert_eq!(stats, physical_plan.partition_statistics(None)?);

Ok(())
}
Expand All @@ -296,7 +304,7 @@ async fn sql_window() -> Result<()> {

let physical_plan = df.create_physical_plan().await.unwrap();

let result = physical_plan.statistics()?;
let result = physical_plan.partition_statistics(None)?;

assert_eq!(stats.num_rows, result.num_rows);
let col_stats = result.column_statistics;
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
31 changes: 23 additions & 8 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,19 @@ async fn check_stats_precision_with_filter_pushdown() {
let (_, _, state) = get_cache_runtime_state();
// Scan without filter, stats are exact
let exec = table.scan(&state, None, &[], None).await.unwrap();
assert_eq!(exec.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec.partition_statistics(None).unwrap().num_rows,
Precision::Exact(8)
);

// Scan with filter pushdown, stats are inexact
let filter = Expr::gt(col("id"), lit(1));

let exec = table.scan(&state, None, &[filter], None).await.unwrap();
assert_eq!(exec.statistics().unwrap().num_rows, Precision::Inexact(8));
assert_eq!(
exec.partition_statistics(None).unwrap().num_rows,
Precision::Inexact(8)
);
}

#[tokio::test]
Expand All @@ -79,9 +85,12 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(get_static_cache_size(&state1), 0);
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();

assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec1.statistics().unwrap().total_byte_size,
exec1.partition_statistics(None).unwrap().num_rows,
Precision::Exact(8)
);
assert_eq!(
exec1.partition_statistics(None).unwrap().total_byte_size,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
Expand All @@ -91,9 +100,12 @@ async fn load_table_stats_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_static_cache_size(&state2), 0);
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec2.statistics().unwrap().total_byte_size,
exec2.partition_statistics(None).unwrap().num_rows,
Precision::Exact(8)
);
assert_eq!(
exec2.partition_statistics(None).unwrap().total_byte_size,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
Expand All @@ -103,9 +115,12 @@ async fn load_table_stats_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_static_cache_size(&state1), 1);
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec3.statistics().unwrap().total_byte_size,
exec3.partition_statistics(None).unwrap().num_rows,
Precision::Exact(8)
);
assert_eq!(
exec3.partition_statistics(None).unwrap().total_byte_size,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ExecutionPlan for SortRequiredExec {
}

fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
self.input.partition_statistics(None)
}
}

Expand Down
Loading