Skip to content

Commit 567da97

Browse files
committed
save
1 parent 775afe8 commit 567da97

File tree

15 files changed

+31
-26
lines changed

15 files changed

+31
-26
lines changed

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl ExecutionPlan for ArrowExec {
191191
self.inner.statistics()
192192
}
193193

194-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
194+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
195195
self.inner.statistics_by_partition()
196196
}
197197

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ mod test {
3939
///
4040
/// This function:
4141
/// - Creates an external table from './tests/data/test_statistics_per_partition'
42-
/// - If we set the `target_partition` to `2, the data contains 2 partitions, each with 2 rows
42+
/// - If we set the `target_partition` to 2, the data contains 2 partitions, each with 2 rows
4343
/// - Each partition has an "id" column (INT) with the following values:
4444
/// - First partition: [3, 4]
4545
/// - Second partition: [1, 2]
4646
/// - Each row is 110 bytes in size
4747
///
4848
/// @param target_partition Optional parameter to set the target partitions
4949
/// @return ExecutionPlan representing the scan of the table with statistics
50-
async fn generate_listing_table_with_statistics(
50+
async fn create_scan_exec_with_statistics(
5151
target_partition: Option<usize>,
5252
) -> Arc<dyn ExecutionPlan> {
5353
let mut session_config = SessionConfig::new().with_collect_statistics(true);
@@ -111,7 +111,7 @@ mod test {
111111
#[tokio::test]
112112
async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()>
113113
{
114-
let scan = generate_listing_table_with_statistics(Some(2)).await;
114+
let scan = create_scan_exec_with_statistics(Some(2)).await;
115115
let statistics = scan.statistics_by_partition()?;
116116
let expected_statistic_partition_1 =
117117
create_partition_statistics(2, 110, 3, 4, true);
@@ -127,7 +127,7 @@ mod test {
127127
#[tokio::test]
128128
async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()>
129129
{
130-
let scan = generate_listing_table_with_statistics(Some(2)).await;
130+
let scan = create_scan_exec_with_statistics(Some(2)).await;
131131
// Add projection execution plan
132132
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
133133
vec![(Arc::new(Column::new("id", 0)), "id".to_string())];
@@ -146,7 +146,7 @@ mod test {
146146

147147
#[tokio::test]
148148
async fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> {
149-
let scan = generate_listing_table_with_statistics(Some(2)).await;
149+
let scan = create_scan_exec_with_statistics(Some(2)).await;
150150
// Add sort execution plan
151151
let sort = SortExec::new(
152152
LexOrdering::new(vec![PhysicalSortExpr {
@@ -179,7 +179,7 @@ mod test {
179179

180180
#[tokio::test]
181181
async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> {
182-
let scan = generate_listing_table_with_statistics(Some(2)).await;
182+
let scan = create_scan_exec_with_statistics(Some(2)).await;
183183
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
184184
let predicate = binary(
185185
Arc::new(Column::new("id", 0)),
@@ -221,7 +221,7 @@ mod test {
221221

222222
#[tokio::test]
223223
async fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> {
224-
let scan = generate_listing_table_with_statistics(Some(2)).await;
224+
let scan = create_scan_exec_with_statistics(Some(2)).await;
225225
let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan]));
226226
let statistics = union_exec.statistics_by_partition()?;
227227
// Check that we have 4 partitions (2 from each scan)

datafusion/datasource-avro/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl ExecutionPlan for AvroExec {
141141
self.inner.statistics()
142142
}
143143

144-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
144+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
145145
self.inner.statistics_by_partition()
146146
}
147147

datafusion/datasource-csv/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ impl ExecutionPlan for CsvExec {
381381
self.inner.statistics()
382382
}
383383

384-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
384+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
385385
self.inner.statistics_by_partition()
386386
}
387387

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,12 @@ impl ExecutionPlan for CoalesceBatchesExec {
195195
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
196196
}
197197

198-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
199-
Ok(vec![self.statistics()?])
198+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
199+
self.input
200+
.statistics_by_partition()?
201+
.into_iter()
202+
.map(|stat| Statistics::with_fetch(stat, self.schema(), self.fetch, 0, 1))
203+
.collect()
200204
}
201205

202206
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
193193
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
194194
}
195195

196-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
196+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
197197
Ok(vec![self.statistics()?])
198198
}
199199

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
5252
use datafusion_physical_expr_common::sort_expr::LexRequirement;
5353

5454
use futures::stream::{StreamExt, TryStreamExt};
55+
use crate::statistics::PartitionedStatistics;
5556

5657
/// Represent nodes in the DataFusion Physical Plan.
5758
///
@@ -430,11 +431,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
430431
/// Returns statistics for each partition of this `ExecutionPlan` node.
431432
/// If statistics are not available, returns an array of
432433
/// [`Statistics::new_unknown`] for each partition.
433-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
434-
Ok(vec![
435-
Statistics::new_unknown(&self.schema());
434+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
435+
Ok(PartitionedStatistics::new(vec![
436+
Arc::new(Statistics::new_unknown(&self.schema()));
436437
self.properties().partitioning.partition_count()
437-
])
438+
]))
438439
}
439440

440441
/// Returns `true` if a limit can be safely pushed down through this

datafusion/physical-plan/src/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ impl ExecutionPlan for FilterExec {
409409
Ok(stats.project(self.projection.as_ref()))
410410
}
411411

412-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
412+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
413413
self.input
414414
.statistics_by_partition()?
415415
.into_iter()

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ impl ExecutionPlan for CrossJoinExec {
345345
))
346346
}
347347

348-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
348+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
349349
let left_stats = self.left.statistics_by_partition()?;
350350
let right_stats = self.right.statistics_by_partition()?;
351351

datafusion/physical-plan/src/limit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl ExecutionPlan for GlobalLimitExec {
202202
)
203203
}
204204

205-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
205+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
206206
Ok(vec![self.statistics()?])
207207
}
208208

@@ -347,7 +347,7 @@ impl ExecutionPlan for LocalLimitExec {
347347
)
348348
}
349349

350-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
350+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
351351
self.input
352352
.statistics_by_partition()?
353353
.into_iter()

datafusion/physical-plan/src/projection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ impl ExecutionPlan for ProjectionExec {
251251
))
252252
}
253253

254-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
254+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
255255
Ok(self
256256
.input
257257
.statistics_by_partition()?

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1237,7 +1237,7 @@ impl ExecutionPlan for SortExec {
12371237
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
12381238
}
12391239

1240-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
1240+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
12411241
if !self.preserve_partitioning() {
12421242
return Ok(vec![self.statistics()?]);
12431243
}

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
346346
self.input.statistics()
347347
}
348348

349-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
349+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
350350
Ok(vec![self.statistics()?])
351351
}
352352

datafusion/physical-plan/src/union.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ impl ExecutionPlan for UnionExec {
270270
.unwrap_or_else(|| Statistics::new_unknown(&self.schema())))
271271
}
272272

273-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
273+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
274274
Ok(self
275275
.inputs
276276
.iter()

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
364364
self.statistics_helper(input_stat)
365365
}
366366

367-
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
367+
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
368368
self.input
369369
.statistics_by_partition()?
370370
.into_iter()

0 commit comments

Comments
 (0)