Skip to content

Commit e7aa6fa

Browse files
committed
save
1 parent 19a1e58 commit e7aa6fa

File tree

5 files changed

+46
-0
lines changed

5 files changed

+46
-0
lines changed

datafusion/datasource/src/source.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,23 @@ impl ExecutionPlan for DataSourceExec {
175175
self.data_source.statistics()
176176
}
177177

178+
fn statistics_by_partition(&self) -> datafusion_common::Result<Vec<Statistics>> {
179+
let mut statistics = vec![
180+
Statistics::new_unknown(&self.schema());
181+
self.properties().partitioning.partition_count()
182+
];
183+
if let Some(file_config) =
184+
self.data_source.as_any().downcast_ref::<FileScanConfig>()
185+
{
186+
for (idx, file_group) in file_config.file_groups.iter().enumerate() {
187+
if let Some(stat) = file_group.statistics() {
188+
statistics[idx] = stat.clone();
189+
}
190+
}
191+
}
192+
Ok(statistics)
193+
}
194+
178195
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
179196
let data_source = self.data_source.with_fetch(limit)?;
180197
let cache = self.cache.clone();

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
427427
Ok(Statistics::new_unknown(&self.schema()))
428428
}
429429

430+
/// Returns statistics for each partition of this `ExecutionPlan` node.
431+
/// If statistics are not available, returns an array of
432+
/// [`Statistics::new_unknown`] for each partition.
433+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
434+
Ok(vec![
435+
Statistics::new_unknown(&self.schema());
436+
self.properties().partitioning.partition_count()
437+
])
438+
}
439+
430440
/// Returns `true` if a limit can be safely pushed down through this
431441
/// `ExecutionPlan` node.
432442
///

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,10 @@ impl ExecutionPlan for RepartitionExec {
680680
self.input.statistics()
681681
}
682682

683+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
684+
todo!()
685+
}
686+
683687
fn cardinality_effect(&self) -> CardinalityEffect {
684688
CardinalityEffect::Equal
685689
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,10 @@ impl ExecutionPlan for SortExec {
12731273
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
12741274
}
12751275

1276+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
1277+
todo!()
1278+
}
1279+
12761280
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
12771281
Some(Arc::new(SortExec::with_fetch(self, limit)))
12781282
}

datafusion/physical-plan/src/union.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,17 @@ impl ExecutionPlan for UnionExec {
270270
.unwrap_or_else(|| Statistics::new_unknown(&self.schema())))
271271
}
272272

273+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
274+
Ok(self
275+
.inputs
276+
.iter()
277+
.map(|child| child.statistics_by_partition())
278+
.collect::<Result<Vec<_>>>()?
279+
.into_iter()
280+
.flatten()
281+
.collect())
282+
}
283+
273284
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
274285
vec![false; self.children().len()]
275286
}

0 commit comments

Comments
 (0)