Skip to content

Add statistics_by_partition API to ExecutionPlan #15503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ impl ListingOptions {
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
/// `file_schema` contains only the columns physically stored in the data files themselves.
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion_physical_plan::{
};

use datafusion_datasource::file_groups::FileGroup;
use datafusion_physical_plan::statistics::PartitionedStatistics;
use futures::StreamExt;
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
Expand Down Expand Up @@ -190,6 +191,11 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
self.inner.statistics()
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
self.inner.statistics_by_partition()
}

fn fetch(&self) -> Option<usize> {
self.inner.fetch()
}
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod enforce_sorting;
mod join_selection;
mod limit_pushdown;
mod limited_distinct_aggregation;
mod partition_statistics;
mod projection_pushdown;
mod push_down_filter;
mod replace_with_order_preserving_variants;
Expand Down
460 changes: 460 additions & 0 deletions datafusion/core/tests/physical_optimizer/partition_statistics.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions datafusion/datasource-avro/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};

use datafusion_physical_plan::statistics::PartitionedStatistics;
use object_store::ObjectStore;

/// Execution plan for scanning Avro data source
Expand Down Expand Up @@ -141,6 +142,10 @@ impl ExecutionPlan for AvroExec {
self.inner.statistics()
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
self.inner.statistics_by_partition()
}

fn metrics(&self) -> Option<MetricsSet> {
self.inner.metrics()
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_physical_plan::{

use crate::file_format::CsvDecoder;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_physical_plan::statistics::PartitionedStatistics;
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
Expand Down Expand Up @@ -381,6 +382,10 @@ impl ExecutionPlan for CsvExec {
self.inner.statistics()
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
self.inner.statistics_by_partition()
}

fn metrics(&self) -> Option<MetricsSet> {
self.inner.metrics()
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,12 @@ impl FileGroup {
}

/// Get the statistics for this group
pub fn statistics(&self) -> Option<&Statistics> {
pub fn statistics(&self) -> &Option<Arc<Statistics>> {
&self.statistics
}

/// Get the statistics for this group
pub fn statistics_ref(&self) -> Option<&Statistics> {
self.statistics.as_deref()
}

Expand Down
21 changes: 21 additions & 0 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::statistics::PartitionedStatistics;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
Expand Down Expand Up @@ -188,6 +189,26 @@ impl ExecutionPlan for DataSourceExec {
self.data_source.statistics()
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
let mut statistics = {
let mut v =
Vec::with_capacity(self.properties().partitioning.partition_count());
(0..self.properties().partitioning.partition_count())
.for_each(|_| v.push(Arc::new(Statistics::new_unknown(&self.schema()))));
v
};
if let Some(file_config) =
self.data_source.as_any().downcast_ref::<FileScanConfig>()
{
for (idx, file_group) in file_config.file_groups.iter().enumerate() {
if let Some(stat) = file_group.statistics() {
statistics[idx] = Arc::clone(stat);
}
}
}
Ok(PartitionedStatistics::new(statistics))
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let data_source = self.data_source.with_fetch(limit)?;
let cache = self.cache.clone();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ pub fn compute_all_files_statistics(
// Then summary statistics across all file groups
let file_groups_statistics = file_groups_with_stats
.iter()
.filter_map(|file_group| file_group.statistics());
.filter_map(|file_group| file_group.statistics().as_deref());

let mut statistics =
Statistics::try_merge_iter(file_groups_statistics, &table_schema)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion_physical_plan::ExecutionPlanProperties;

use itertools::izip;

Expand Down Expand Up @@ -205,9 +205,8 @@ pub fn plan_with_order_breaking_variants(
// Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`
// SPM may have `fetch`, so pass it to the `CoalescePartitionsExec`
let child = Arc::clone(&sort_input.children[0].plan);
let coalesce = CoalescePartitionsExec::new(child)
.with_fetch(plan.fetch())
.unwrap();
let coalesce =
Arc::new(CoalescePartitionsExec::new(child).with_fetch(plan.fetch()));
sort_input.plan = coalesce;
} else {
return sort_input.update_plan_from_children();
Expand Down
21 changes: 21 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::task::{Context, Poll};

use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
use crate::statistics::PartitionedStatistics;
use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
Expand Down Expand Up @@ -199,6 +200,26 @@ impl ExecutionPlan for CoalesceBatchesExec {
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
let input_stats = self.input.statistics_by_partition()?;

let stats: Result<Vec<Arc<Statistics>>> = input_stats
.iter()
.map(|stat| {
let fetched_stat = Statistics::with_fetch(
stat.clone(),
self.schema(),
self.fetch,
0,
1,
)?;
Ok(Arc::new(fetched_stat))
})
.collect();

Ok(PartitionedStatistics::new(stats?))
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(CoalesceBatchesExec {
input: Arc::clone(&self.input),
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::execution_plan::CardinalityEffect;
use crate::projection::{make_with_child, ProjectionExec};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use crate::statistics::PartitionedStatistics;
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -199,6 +200,12 @@ impl ExecutionPlan for CoalescePartitionsExec {
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
Ok(PartitionedStatistics::new(vec![Arc::new(
self.statistics()?,
)]))
}

fn supports_limit_pushdown(&self) -> bool {
true
}
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use crate::statistics::PartitionedStatistics;
use futures::stream::{StreamExt, TryStreamExt};

/// Represent nodes in the DataFusion Physical Plan.
Expand Down Expand Up @@ -430,6 +431,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
Ok(Statistics::new_unknown(&self.schema()))
}

/// Returns statistics for each partition of this `ExecutionPlan` node.
/// If statistics are not available, returns an array of
/// [`Statistics::new_unknown`] for each partition.
fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
Ok(PartitionedStatistics::new({
let mut v =
Vec::with_capacity(self.properties().partitioning.partition_count());
(0..self.properties().partitioning.partition_count())
.for_each(|_| v.push(Arc::new(Statistics::new_unknown(&self.schema()))));
v
}))
}

/// Returns `true` if a limit can be safely pushed down through this
/// `ExecutionPlan` node.
///
Expand Down
38 changes: 32 additions & 6 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use datafusion_physical_expr::{
ExprBoundaries, PhysicalExpr,
};

use crate::statistics::PartitionedStatistics;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use futures::stream::{Stream, StreamExt};
use log::trace;
Expand Down Expand Up @@ -174,12 +175,11 @@ impl FilterExec {

/// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
fn statistics_helper(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
input_stats: Statistics,
predicate: &Arc<dyn PhysicalExpr>,
default_selectivity: u8,
) -> Result<Statistics> {
let input_stats = input.statistics()?;
let schema = input.schema();
if !check_support(predicate, &schema) {
let selectivity = default_selectivity as f64 / 100.0;
let mut stats = input_stats.to_inexact();
Expand All @@ -193,7 +193,7 @@ impl FilterExec {
let num_rows = input_stats.num_rows;
let total_byte_size = input_stats.total_byte_size;
let input_analysis_ctx = AnalysisContext::try_from_statistics(
&input.schema(),
&schema,
&input_stats.column_statistics,
)?;

Expand Down Expand Up @@ -260,7 +260,12 @@ impl FilterExec {
) -> Result<PlanProperties> {
// Combine the equal predicates with the input equivalence properties
// to construct the equivalence properties:
let stats = Self::statistics_helper(input, predicate, default_selectivity)?;
let stats = Self::statistics_helper(
input.schema(),
input.statistics()?,
predicate,
default_selectivity,
)?;
let mut eq_properties = input.equivalence_properties().clone();
let (equal_pairs, _) = collect_columns_from_predicate(predicate);
for (lhs, rhs) in equal_pairs {
Expand Down Expand Up @@ -401,13 +406,34 @@ impl ExecutionPlan for FilterExec {
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Result<Statistics> {
let stats = Self::statistics_helper(
&self.input,
self.schema(),
self.input().statistics()?,
self.predicate(),
self.default_selectivity,
)?;
Ok(stats.project(self.projection.as_ref()))
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
let input_stats = self.input.statistics_by_partition()?;

let stats: Result<Vec<Arc<Statistics>>> = input_stats
.iter()
.map(|stat| {
let stat = Self::statistics_helper(
self.schema(),
stat.clone(),
self.predicate(),
self.default_selectivity,
)
.map(|stat| stat.project(self.projection.as_ref()))?;
Ok(Arc::new(stat))
})
.collect();

Ok(PartitionedStatistics::new(stats?))
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
Expand Down
26 changes: 26 additions & 0 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;

use crate::statistics::{compute_summary_statistics, PartitionedStatistics};
use async_trait::async_trait;
use futures::{ready, Stream, StreamExt, TryStreamExt};

Expand Down Expand Up @@ -343,6 +344,31 @@ impl ExecutionPlan for CrossJoinExec {
))
}

fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
let left_stats = self.left.statistics_by_partition()?;
let right_stats = self.right.statistics_by_partition()?;

if left_stats.is_empty() || right_stats.is_empty() {
return Ok(PartitionedStatistics::new(vec![]));
}

// Summarize the `left_stats`
let statistics = compute_summary_statistics(
left_stats.iter(),
self.left.schema().fields().len(),
|stats| Some(stats),
);

Ok(PartitionedStatistics::new(
right_stats
.iter()
.map(|right| {
Arc::new(stats_cartesian_product(statistics.clone(), right.clone()))
})
.collect(),
))
}

/// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done,
/// it returns the new swapped version having the [`CrossJoinExec`] as the top plan.
/// Otherwise, it returns None.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ impl ExecutionPlan for HashJoinExec {
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
let stats = estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.left.statistics()?,
self.right.statistics()?,
self.on.clone(),
&self.join_type,
&self.join_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ impl ExecutionPlan for NestedLoopJoinExec {

fn statistics(&self) -> Result<Statistics> {
estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.left.statistics()?,
self.right.statistics()?,
vec![],
&self.join_type,
&self.join_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ impl ExecutionPlan for SortMergeJoinExec {
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.left.statistics()?,
self.right.statistics()?,
self.on.clone(),
&self.join_type,
&self.schema,
Expand Down
Loading
Loading