Skip to content

Commit 14eeb65

Browse files
committed
Feat: introduce partition statistics API
1 parent 572a1d8 commit 14eeb65

File tree

20 files changed

+157
-419
lines changed

20 files changed

+157
-419
lines changed

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use datafusion_physical_plan::{
4242
};
4343

4444
use datafusion_datasource::file_groups::FileGroup;
45-
use datafusion_physical_plan::statistics::PartitionedStatistics;
4645
use futures::StreamExt;
4746
use itertools::Itertools;
4847
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
@@ -192,8 +191,8 @@ impl ExecutionPlan for ArrowExec {
192191
self.inner.statistics()
193192
}
194193

195-
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
196-
self.inner.statistics_by_partition()
194+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
195+
self.inner.partition_statistics(partition)
197196
}
198197

199198
fn fetch(&self) -> Option<usize> {

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ mod test {
3939
use datafusion_physical_plan::projection::ProjectionExec;
4040
use datafusion_physical_plan::sorts::sort::SortExec;
4141
use datafusion_physical_plan::union::UnionExec;
42-
use datafusion_physical_plan::{execute_stream_partitioned, ExecutionPlan};
42+
use datafusion_physical_plan::{
43+
execute_stream_partitioned, ExecutionPlan, ExecutionPlanProperties,
44+
};
4345
use futures::TryStreamExt;
4446
use std::sync::Arc;
4547

@@ -186,7 +188,9 @@ mod test {
186188
#[tokio::test]
187189
async fn test_statistics_by_partition_of_data_source() -> Result<()> {
188190
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
189-
let statistics = scan.statistics_by_partition()?;
191+
let statistics = (0..scan.output_partitioning().partition_count())
192+
.map(|idx| scan.partition_statistics(Some(idx)))
193+
.collect::<Result<Vec<_>>>()?;
190194
let expected_statistic_partition_1 =
191195
create_partition_statistics(2, 110, 3, 4, true);
192196
let expected_statistic_partition_2 =
@@ -212,8 +216,11 @@ mod test {
212216
// Add projection execution plan
213217
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
214218
vec![(Arc::new(Column::new("id", 0)), "id".to_string())];
215-
let projection = ProjectionExec::try_new(exprs, scan)?;
216-
let statistics = projection.statistics_by_partition()?;
219+
let projection: Arc<dyn ExecutionPlan> =
220+
Arc::new(ProjectionExec::try_new(exprs, scan)?);
221+
let statistics = (0..projection.output_partitioning().partition_count())
222+
.map(|idx| projection.partition_statistics(Some(idx)))
223+
.collect::<Result<Vec<_>>>()?;
217224
let expected_statistic_partition_1 =
218225
create_partition_statistics(2, 8, 3, 4, false);
219226
let expected_statistic_partition_2 =
@@ -225,7 +232,7 @@ mod test {
225232

226233
// Check the statistics_by_partition with real results
227234
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
228-
validate_statistics_with_data(Arc::new(projection), expected_stats, 0).await?;
235+
validate_statistics_with_data(projection, expected_stats, 0).await?;
229236
Ok(())
230237
}
231238

@@ -243,8 +250,10 @@ mod test {
243250
}]),
244251
scan_1,
245252
);
246-
let sort_exec = Arc::new(sort.clone());
247-
let statistics = sort_exec.statistics_by_partition()?;
253+
let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(sort.clone());
254+
let statistics = (0..sort_exec.output_partitioning().partition_count())
255+
.map(|idx| sort_exec.partition_statistics(Some(idx)))
256+
.collect::<Result<Vec<_>>>()?;
248257
let expected_statistic_partition =
249258
create_partition_statistics(4, 220, 1, 4, true);
250259
assert_eq!(statistics.len(), 1);
@@ -256,7 +265,7 @@ mod test {
256265
// Sort with preserve_partitioning
257266
let scan_2 = create_scan_exec_with_statistics(None, Some(2)).await;
258267
// Add sort execution plan
259-
let sort_exec = Arc::new(
268+
let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(
260269
SortExec::new(
261270
LexOrdering::new(vec![PhysicalSortExpr {
262271
expr: Arc::new(Column::new("id", 0)),
@@ -273,7 +282,9 @@ mod test {
273282
create_partition_statistics(2, 110, 3, 4, true);
274283
let expected_statistic_partition_2 =
275284
create_partition_statistics(2, 110, 1, 2, true);
276-
let statistics = sort_exec.statistics_by_partition()?;
285+
let statistics = (0..sort_exec.output_partitioning().partition_count())
286+
.map(|idx| sort_exec.partition_statistics(Some(idx)))
287+
.collect::<Result<Vec<_>>>()?;
277288
assert_eq!(statistics.len(), 2);
278289
assert_eq!(statistics[0], expected_statistic_partition_1);
279290
assert_eq!(statistics[1], expected_statistic_partition_2);
@@ -296,7 +307,7 @@ mod test {
296307
)?;
297308
let filter: Arc<dyn ExecutionPlan> =
298309
Arc::new(FilterExec::try_new(predicate, scan)?);
299-
let full_statistics = filter.statistics()?;
310+
let full_statistics = filter.partition_statistics(None)?;
300311
let expected_full_statistic = Statistics {
301312
num_rows: Precision::Inexact(0),
302313
total_byte_size: Precision::Inexact(0),
@@ -319,7 +330,9 @@ mod test {
319330
};
320331
assert_eq!(full_statistics, expected_full_statistic);
321332

322-
let statistics = filter.statistics_by_partition()?;
333+
let statistics = (0..filter.output_partitioning().partition_count())
334+
.map(|idx| filter.partition_statistics(Some(idx)))
335+
.collect::<Result<Vec<_>>>()?;
323336
assert_eq!(statistics.len(), 2);
324337
assert_eq!(statistics[0], expected_full_statistic);
325338
assert_eq!(statistics[1], expected_full_statistic);
@@ -329,8 +342,11 @@ mod test {
329342
#[tokio::test]
330343
async fn test_statistic_by_partition_of_union() -> Result<()> {
331344
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
332-
let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan]));
333-
let statistics = union_exec.statistics_by_partition()?;
345+
let union_exec: Arc<dyn ExecutionPlan> =
346+
Arc::new(UnionExec::new(vec![scan.clone(), scan]));
347+
let statistics = (0..union_exec.output_partitioning().partition_count())
348+
.map(|idx| union_exec.partition_statistics(Some(idx)))
349+
.collect::<Result<Vec<_>>>()?;
334350
// Check that we have 4 partitions (2 from each scan)
335351
assert_eq!(statistics.len(), 4);
336352
let expected_statistic_partition_1 =
@@ -360,8 +376,11 @@ mod test {
360376
WITH ORDER (id ASC);";
361377
let right_scan =
362378
create_scan_exec_with_statistics(Some(right_create_table_sql), Some(2)).await;
363-
let cross_join = CrossJoinExec::new(left_scan, right_scan);
364-
let statistics = cross_join.statistics_by_partition()?;
379+
let cross_join: Arc<dyn ExecutionPlan> =
380+
Arc::new(CrossJoinExec::new(left_scan, right_scan));
381+
let statistics = (0..cross_join.output_partitioning().partition_count())
382+
.map(|idx| cross_join.partition_statistics(Some(idx)))
383+
.collect::<Result<Vec<_>>>()?;
365384
// Check that we have 2 partitions
366385
assert_eq!(statistics.len(), 2);
367386
let mut expected_statistic_partition_1 =
@@ -391,52 +410,60 @@ mod test {
391410

392411
// Check the statistics_by_partition with real results
393412
let expected_stats = vec![(1, 4, 8), (1, 4, 8)];
394-
validate_statistics_with_data(Arc::new(cross_join), expected_stats, 0).await?;
413+
validate_statistics_with_data(cross_join, expected_stats, 0).await?;
395414
Ok(())
396415
}
397416

398417
#[tokio::test]
399418
async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> {
400419
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
401-
let coalesce_batches = CoalesceBatchesExec::new(scan, 2);
420+
dbg!(scan.partition_statistics(Some(0))?);
421+
let coalesce_batches: Arc<dyn ExecutionPlan> =
422+
Arc::new(CoalesceBatchesExec::new(scan, 2));
402423
let expected_statistic_partition_1 =
403424
create_partition_statistics(2, 110, 3, 4, true);
404425
let expected_statistic_partition_2 =
405426
create_partition_statistics(2, 110, 1, 2, true);
406-
let statistics = coalesce_batches.statistics_by_partition()?;
427+
let statistics = (0..coalesce_batches.output_partitioning().partition_count())
428+
.map(|idx| coalesce_batches.partition_statistics(Some(idx)))
429+
.collect::<Result<Vec<_>>>()?;
407430
assert_eq!(statistics.len(), 2);
408431
assert_eq!(statistics[0], expected_statistic_partition_1);
409432
assert_eq!(statistics[1], expected_statistic_partition_2);
410433

411434
// Check the statistics_by_partition with real results
412435
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
413-
validate_statistics_with_data(Arc::new(coalesce_batches), expected_stats, 0)
414-
.await?;
436+
validate_statistics_with_data(coalesce_batches, expected_stats, 0).await?;
415437
Ok(())
416438
}
417439

418440
#[tokio::test]
419441
async fn test_statistic_by_partition_of_coalesce_partitions() -> Result<()> {
420442
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
421-
let coalesce_partitions = CoalescePartitionsExec::new(scan);
443+
let coalesce_partitions: Arc<dyn ExecutionPlan> =
444+
Arc::new(CoalescePartitionsExec::new(scan));
422445
let expected_statistic_partition =
423446
create_partition_statistics(4, 220, 1, 4, true);
424-
let statistics = coalesce_partitions.statistics_by_partition()?;
447+
let statistics = (0..coalesce_partitions.output_partitioning().partition_count())
448+
.map(|idx| coalesce_partitions.partition_statistics(Some(idx)))
449+
.collect::<Result<Vec<_>>>()?;
425450
assert_eq!(statistics.len(), 1);
426451
assert_eq!(statistics[0], expected_statistic_partition);
427452

428453
// Check the statistics_by_partition with real results
429454
let expected_stats = vec![(1, 4, 4)];
430-
validate_statistics_with_data(Arc::new(coalesce_partitions), expected_stats, 0)
431-
.await?;
455+
validate_statistics_with_data(coalesce_partitions, expected_stats, 0).await?;
432456
Ok(())
433457
}
434458

435459
#[tokio::test]
436460
async fn test_statistic_by_partition_of_local_limit() -> Result<()> {
437461
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
438-
let local_limit = LocalLimitExec::new(scan.clone(), 1);
439-
let statistics = local_limit.statistics_by_partition()?;
462+
let local_limit: Arc<dyn ExecutionPlan> =
463+
Arc::new(LocalLimitExec::new(scan.clone(), 1));
464+
let statistics = (0..local_limit.output_partitioning().partition_count())
465+
.map(|idx| local_limit.partition_statistics(Some(idx)))
466+
.collect::<Result<Vec<_>>>()?;
440467
assert_eq!(statistics.len(), 2);
441468
let schema = scan.schema();
442469
let mut expected_statistic_partition = Statistics::new_unknown(&schema);
@@ -449,8 +476,11 @@ mod test {
449476
#[tokio::test]
450477
async fn test_statistic_by_partition_of_global_limit_partitions() -> Result<()> {
451478
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
452-
let global_limit = GlobalLimitExec::new(scan.clone(), 0, Some(2));
453-
let statistics = global_limit.statistics_by_partition()?;
479+
let global_limit: Arc<dyn ExecutionPlan> =
480+
Arc::new(GlobalLimitExec::new(scan.clone(), 0, Some(2)));
481+
let statistics = (0..global_limit.output_partitioning().partition_count())
482+
.map(|idx| global_limit.partition_statistics(Some(idx)))
483+
.collect::<Result<Vec<_>>>()?;
454484
assert_eq!(statistics.len(), 1);
455485
let mut expected_statistic_partition = Statistics::new_unknown(&scan.schema());
456486
expected_statistic_partition.num_rows = Precision::Exact(2);

datafusion/datasource-avro/src/source.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ use datafusion_physical_plan::{
4040
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
4141
};
4242

43-
use datafusion_physical_plan::statistics::PartitionedStatistics;
4443
use object_store::ObjectStore;
4544

4645
/// Execution plan for scanning Avro data source
@@ -142,8 +141,8 @@ impl ExecutionPlan for AvroExec {
142141
self.inner.statistics()
143142
}
144143

145-
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
146-
self.inner.statistics_by_partition()
144+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
145+
self.inner.partition_statistics(partition)
147146
}
148147

149148
fn metrics(&self) -> Option<MetricsSet> {

datafusion/datasource-csv/src/source.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ use datafusion_physical_plan::{
5151

5252
use crate::file_format::CsvDecoder;
5353
use datafusion_datasource::file_groups::FileGroup;
54-
use datafusion_physical_plan::statistics::PartitionedStatistics;
5554
use futures::{StreamExt, TryStreamExt};
5655
use object_store::buffered::BufWriter;
5756
use object_store::{GetOptions, GetResultPayload, ObjectStore};
@@ -382,8 +381,8 @@ impl ExecutionPlan for CsvExec {
382381
self.inner.statistics()
383382
}
384383

385-
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
386-
self.inner.statistics_by_partition()
384+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
385+
self.inner.partition_statistics(partition)
387386
}
388387

389388
fn metrics(&self) -> Option<MetricsSet> {

datafusion/datasource/src/file_groups.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -420,11 +420,6 @@ impl FileGroup {
420420
self.files.push(file);
421421
}
422422

423-
/// Get the statistics for this group
424-
pub fn statistics(&self) -> &Option<Arc<Statistics>> {
425-
&self.statistics
426-
}
427-
428423
/// Get the statistics for this group
429424
pub fn statistics_ref(&self) -> Option<&Statistics> {
430425
self.statistics.as_deref()

datafusion/datasource/src/source.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use std::sync::Arc;
2525
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
2626
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2727
use datafusion_physical_plan::projection::ProjectionExec;
28-
use datafusion_physical_plan::statistics::PartitionedStatistics;
2928
use datafusion_physical_plan::{
3029
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
3130
};
@@ -189,24 +188,22 @@ impl ExecutionPlan for DataSourceExec {
189188
self.data_source.statistics()
190189
}
191190

192-
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
193-
let mut statistics = {
194-
let mut v =
195-
Vec::with_capacity(self.properties().partitioning.partition_count());
196-
(0..self.properties().partitioning.partition_count())
197-
.for_each(|_| v.push(Arc::new(Statistics::new_unknown(&self.schema()))));
198-
v
199-
};
200-
if let Some(file_config) =
201-
self.data_source.as_any().downcast_ref::<FileScanConfig>()
202-
{
203-
for (idx, file_group) in file_config.file_groups.iter().enumerate() {
204-
if let Some(stat) = file_group.statistics() {
205-
statistics[idx] = Arc::clone(stat);
191+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
192+
if let Some(partition) = partition {
193+
let mut statistics = Statistics::new_unknown(&self.schema());
194+
if let Some(file_config) =
195+
self.data_source.as_any().downcast_ref::<FileScanConfig>()
196+
{
197+
if let Some(file_group) = file_config.file_groups.get(partition) {
198+
if let Some(stat) = file_group.statistics_ref() {
199+
statistics = stat.clone();
200+
}
206201
}
207202
}
203+
Ok(statistics)
204+
} else {
205+
Ok(self.data_source.statistics()?)
208206
}
209-
Ok(PartitionedStatistics::new(statistics))
210207
}
211208

212209
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {

datafusion/datasource/src/statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ pub fn compute_all_files_statistics(
476476
// Then summary statistics across all file groups
477477
let file_groups_statistics = file_groups_with_stats
478478
.iter()
479-
.filter_map(|file_group| file_group.statistics().as_deref());
479+
.filter_map(|file_group| file_group.statistics_ref());
480480

481481
let mut statistics =
482482
Statistics::try_merge_iter(file_groups_statistics, &table_schema)?;

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::task::{Context, Poll};
2424

2525
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2626
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27-
use crate::statistics::PartitionedStatistics;
2827
use crate::{
2928
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
3029
};
@@ -200,24 +199,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
200199
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
201200
}
202201

203-
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
204-
let input_stats = self.input.statistics_by_partition()?;
205-
206-
let stats: Result<Vec<Arc<Statistics>>> = input_stats
207-
.iter()
208-
.map(|stat| {
209-
let fetched_stat = Statistics::with_fetch(
210-
stat.clone(),
211-
self.schema(),
212-
self.fetch,
213-
0,
214-
1,
215-
)?;
216-
Ok(Arc::new(fetched_stat))
217-
})
218-
.collect();
219-
220-
Ok(PartitionedStatistics::new(stats?))
202+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
203+
let input_stats = self.input.partition_statistics(partition)?;
204+
let fetched_stat =
205+
Statistics::with_fetch(input_stats.clone(), self.schema(), self.fetch, 0, 1)?;
206+
Ok(fetched_stat)
221207
}
222208

223209
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use crate::execution_plan::CardinalityEffect;
3131
use crate::projection::{make_with_child, ProjectionExec};
3232
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3333

34-
use crate::statistics::PartitionedStatistics;
3534
use datafusion_common::{internal_err, Result};
3635
use datafusion_execution::TaskContext;
3736

@@ -200,10 +199,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
200199
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
201200
}
202201

203-
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
204-
Ok(PartitionedStatistics::new(vec![Arc::new(
205-
self.statistics()?,
206-
)]))
202+
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
203+
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
207204
}
208205

209206
fn supports_limit_pushdown(&self) -> bool {

0 commit comments

Comments
 (0)