Skip to content

Commit 71777d4

Browse files
committed
rebase main and fix tests
1 parent 90b8f15 commit 71777d4

File tree

3 files changed

+40
-14
lines changed

3 files changed

+40
-14
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,9 +1154,18 @@ impl ListingTable {
11541154
.map_schema(self.file_schema.as_ref())?;
11551155
// Use schema_mapper to map each file-level column statistics to table-level column statistics
11561156
file_groups.iter_mut().try_for_each(|file_group| {
1157-
if let Some(stat) = file_group.statistics_mut() {
1158-
stat.column_statistics =
1159-
schema_mapper.map_column_statistics(&stat.column_statistics)?;
1157+
// Update each file's statistics's column statistics in file_group
1158+
for idx in 0..file_group.len() {
1159+
if let Some(stat) = file_group[idx].statistics.as_ref() {
1160+
let column_statistics =
1161+
schema_mapper.map_column_statistics(&stat.column_statistics)?;
1162+
// Update the file's statistics with the mapped column statistics
1163+
file_group[idx].statistics = Some(Arc::new(Statistics {
1164+
num_rows: stat.num_rows.clone(),
1165+
total_byte_size: stat.total_byte_size.clone(),
1166+
column_statistics,
1167+
}));
1168+
}
11601169
}
11611170
Ok::<_, DataFusionError>(())
11621171
})?;

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -476,14 +476,15 @@ mod test {
476476
#[tokio::test]
477477
async fn test_statistic_by_partition_of_global_limit_partitions() -> Result<()> {
478478
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
479+
// Skip 2 rows
479480
let global_limit: Arc<dyn ExecutionPlan> =
480481
Arc::new(GlobalLimitExec::new(scan.clone(), 0, Some(2)));
481482
let statistics = (0..global_limit.output_partitioning().partition_count())
482483
.map(|idx| global_limit.partition_statistics(Some(idx)))
483484
.collect::<Result<Vec<_>>>()?;
484485
assert_eq!(statistics.len(), 1);
485-
let mut expected_statistic_partition = Statistics::new_unknown(&scan.schema());
486-
expected_statistic_partition.num_rows = Precision::Exact(2);
486+
let expected_statistic_partition =
487+
create_partition_statistics(2, 110, 3, 4, true);
487488
assert_eq!(statistics[0], expected_statistic_partition);
488489
Ok(())
489490
}

datafusion/physical-plan/src/union.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -262,16 +262,32 @@ impl ExecutionPlan for UnionExec {
262262
}
263263

264264
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
265-
let stats = self
266-
.inputs
267-
.iter()
268-
.map(|input_exec| input_exec.partition_statistics(partition))
269-
.collect::<Result<Vec<_>>>()?;
265+
if let Some(partition_idx) = partition {
266+
// For a specific partition, find which input it belongs to
267+
let mut remaining_idx = partition_idx;
268+
for input in &self.inputs {
269+
let input_partition_count = input.output_partitioning().partition_count();
270+
if remaining_idx < input_partition_count {
271+
// This partition belongs to this input
272+
return input.partition_statistics(Some(remaining_idx));
273+
}
274+
remaining_idx -= input_partition_count;
275+
}
276+
// If we get here, the partition index is out of bounds
277+
return Ok(Statistics::new_unknown(&self.schema()));
278+
} else {
279+
// Collect statistics from all inputs
280+
let stats = self
281+
.inputs
282+
.iter()
283+
.map(|input_exec| input_exec.partition_statistics(None))
284+
.collect::<Result<Vec<_>>>()?;
270285

271-
Ok(stats
272-
.into_iter()
273-
.reduce(stats_union)
274-
.unwrap_or_else(|| Statistics::new_unknown(&self.schema())))
286+
Ok(stats
287+
.into_iter()
288+
.reduce(stats_union)
289+
.unwrap_or_else(|| Statistics::new_unknown(&self.schema())))
290+
}
275291
}
276292

277293
fn benefits_from_input_partitioning(&self) -> Vec<bool> {

0 commit comments

Comments
 (0)