Skip to content

Add statistics_by_partition API to ExecutionPlan #15503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from

Conversation

xudong963
Copy link
Member

Which issue does this PR close?

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@xudong963 xudong963 marked this pull request as draft March 31, 2025 11:22
@github-actions github-actions bot added the datasource Changes to the datasource crate label Mar 31, 2025
@berkaysynnada
Copy link
Contributor

I suggest modifying the existing API, not a new one.

@xudong963
Copy link
Member Author

I suggest modifying the existing API, not a new one.

Which one do you think is suitable to modify, statistics()?

@berkaysynnada
Copy link
Contributor

berkaysynnada commented Apr 1, 2025

I suggest modifying the existing API, not a new one.

Which one do you think is suitable to modify, statistics()?

yes, is there something blocker?

@xudong963
Copy link
Member Author

I suggest modifying the existing API, not a new one.

Which one do you think is suitable to modify, statistics()?

yes, is there something blocker?

I think keeping statistics_by_partition as a separate API from statistics is the better approach for several reasons:

  1. Clear separation of concerns: The two methods serve different purposes - one provides global statistics for the entire execution plan, while the other provides partition-level details.

  2. Backward compatibility: Modifying statistics() to handle both cases would likely be a breaking change.

  3. API clarity: Having separate methods makes the intent clearer when calling the API.

@xudong963 xudong963 force-pushed the statistic_per_partition_api branch from e7aa6fa to 4d18715 Compare April 1, 2025 11:15
@github-actions github-actions bot added the core Core DataFusion crate label Apr 1, 2025
@xudong963 xudong963 force-pushed the statistic_per_partition_api branch from 6531341 to f5f9f2c Compare April 2, 2025 04:33
@xudong963 xudong963 force-pushed the statistic_per_partition_api branch from ac5ae71 to d2112b2 Compare April 3, 2025 07:58
@xudong963 xudong963 changed the title WIP: Add statistics_by_partition API to ExecutionPlan Add statistics_by_partition API to ExecutionPlan Apr 3, 2025
@xudong963 xudong963 marked this pull request as ready for review April 3, 2025 09:07
@xudong963 xudong963 requested a review from alamb April 3, 2025 09:08
@alamb
Copy link
Contributor

alamb commented Apr 3, 2025

Eventually I agree with @berkaysynnada that it would be nice to have a single API that returned information about the overall and per partition statistics. However, I also agree with @xudong963 that there is no clear way to do that at the moment that isn't a major API change

How about we proceed with a new API and maybe plan some future work to unify them 🤔

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xudong963 -- I think this is a good start. I left some comments on the basic API setup but I think overall this is looking good.

One thing that would be really nice it to find some way to avoid cloning statistics. We already clone a lot of information on each call to ExecutionPlan::statistics() and I worry the new statistics_per_partiton API will only make this worse

/// Returns statistics for each partition of this `ExecutionPlan` node.
/// If statistics are not available, returns an array of
/// [`Statistics::new_unknown`] for each partition.
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please make a structure rather than directly using Vec<Statistics> here?

I think doing so will make it easier / less breaking if we want to evolve how these statistics are handled. This was a lesson learned from our work with LexOrdering / EquivalenceProperties.

Something like the following

/// Statistics for each partitition
struct PartitionedStatistics {
  inner: Vec<Statistics>
}

impl PartitionedStatistics {
  fn len(&self) -> usize {
   self.inner.len()
  }

  /// return the statistics for the specified partition
  fn statistics(&self) -> &Statistics {
    ...
  }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it!

@@ -344,6 +345,26 @@ impl ExecutionPlan for CrossJoinExec {
))
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I saw a test for this code. Maybe I missed it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed it lol

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 910454a

Comment on lines 142 to 271
assert_eq!(statistics[0].num_rows, Precision::Exact(4));
assert_eq!(statistics[0].column_statistics.len(), 2);
assert_eq!(statistics[0].total_byte_size, Precision::Exact(220));
assert_eq!(
statistics[0].column_statistics[0].null_count,
Precision::Exact(0)
);
assert_eq!(
statistics[0].column_statistics[0].max_value,
Precision::Exact(ScalarValue::Int32(Some(4)))
);
assert_eq!(
statistics[0].column_statistics[0].min_value,
Precision::Exact(ScalarValue::Int32(Some(1)))
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it hard to read here what the expected statistics were

What do you think about a pattern like this (to create the expected statistcs)

```suggestion
        let expected_statistics = Statistics {
          num_rows: Precision::Exact(4),
          total_bute_size:Precision::Exact(220),
          column_statistics: vec![
            ColumnStatistics {...
            }]
          };
        assert_eq!(statistics, expected_statistics);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I believe after 667337b, it'll be clear.

let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, scan)?);
let _full_statistics = filter.statistics()?;
// The full statistics is invalid, at least, we can improve the selectivity estimation of the filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. Should we file a ticket to track whatever the expected result is?

Copy link
Member Author

@xudong963 xudong963 Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the result of full_statistics can be improved, I'll open an issue to follow the further improvement.

}
}

/// If the given value is numerically greater than the original maximum value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems somewhat duplicated with Precision::max 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find. I think we can only keep one. Will open a separated PR to do it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recorded an issue: #15615

{
for (idx, file_group) in file_config.file_groups.iter().enumerate() {
if let Some(stat) = file_group.statistics() {
statistics[idx] = stat.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also growing worried about the amount of cloning happening for each Statistics object... they are deep clones at the moment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have some thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should fix this as soon as possible, since the Statistics part of the codebase is under heavy focus at these moments by many people, and in the near future, I expect that we will have many Statistics related PR's and developments. So, to not bring an inherent regression, we should bring the infra to a safely extensible state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping the Statistics with Arc<>'s can be a solution?

Some structs like FileGroup, PartitionedData etc. caches the Statistics. So, if the source operators can access those, they should return over them. However, for other intermediate operators, perhaps we can utilize PlanProperties? The Statistics will be initiated once and cached like other planning properties

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at least we can use Arc now. I've done a pre PR to add Arc for the statistics FileGroup #15564

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can utilize PlanProperties? The Statistics will be initiated once and cached like other planning properties

I've had a general look at it and it should work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Record an issue #15614

@xudong963
Copy link
Member Author

@alamb Sorry for the confusion about the tests, I'll refactor and document them

@xudong963
Copy link
Member Author

@alamb @berkaysynnada My thought about unifying the two methods:

/// Specifies what statistics to compute
pub enum StatisticsType {
    /// Only compute global statistics
    Global,
    /// Only compute per-partition statistics
    Partition,
    /// Compute both global and per-partition statistics
    Both,
}

/// Holds both global and per-partition statistics
pub struct ExecutionPlanStatistics {
    /// Global statistics for the entire plan
    pub global: Option<Statistics>,
    /// Statistics broken down by partition
    pub partition: Option<Vec<Statistics>>,
}

/// Returns statistics for this `ExecutionPlan` node based on the requested type.
/// Only computes what is requested to avoid unnecessary calculations.
fn statistics(&self, stat_type: StatisticsType) -> Result<ExecutionPlanStatistics> {
    match stat_type {
        StatisticsType::Global => Ok(ExecutionPlanStatistics {
            global: Some(Statistics::new_unknown(&self.schema())),
            partition: None,
        }),
        StatisticsType::Partition => {
            let partition_stats = vec![
                Statistics::new_unknown(&self.schema());
                self.properties().partitioning.partition_count()
            ];
            
            Ok(ExecutionPlanStatistics {
                global: None,
                partition: Some(partition_stats),
            })
        },
        StatisticsType::Both => {
            let partition_stats = vec![
                Statistics::new_unknown(&self.schema());
                self.properties().partitioning.partition_count()
            ];
            
            // Could merge partition stats here for global stats if needed
            let global_stats = Statistics::new_unknown(&self.schema());
            
            Ok(ExecutionPlanStatistics {
                global: Some(global_stats),
                partition: Some(partition_stats),
            })
        }
    }
}

@xudong963
Copy link
Member Author

Also, cc @suremarc to join the discussion

Copy link
Contributor

@suremarc suremarc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statistics_by_partition impls look mostly correct to me with a couple of exceptions, and the tests look good 👍 I added a comment about a potential improvement there.

On the topic of the proposed API for a new ExecutionPlan::statistics() signature & method:

fn statistics(&self, stat_type: StatisticsType) -> Result<ExecutionPlanStatistics>

I'm going to take it for granted that computing and cloning lots of statistics is expensive, as seems to be implied, though if anyone could point me to prior discussion on that issue that would be appreciated.

I see that this avoids computing partition-level statistics if it doesn't need to, which is nice, but I don't love it, mainly because someone could specify StatisticsType::Global or StatisticsType::Partition, and then send the ExecutionPlanStatistics to downstream code that is expecting both types, or the wrong type. Then that code will just fail. So you might just end up always specifying StatisticsType::Both, which is inefficient.

IMO the nicest API for the user that still avoids unnecessary computation would be something like this:

// Lazily compute statistics on-demand
// Could potentially cache results
// Each `ExecutionPlan` puts logic for computing their statistics inside an implementation of `ExecutionPlanStatistics`. 
pub trait ExecutionPlanStatistics {
    fn global(&self) -> Result<Statistics>
    fn by_partition(&self) -> Result<PartitionStatistics>
}

pub trait ExecutionPlan {
// ...
    fn statistics(&self) -> Result<&dyn ExecutionPlanStatistics>
}

Though I will admit it puts more burden on the implementor, and it might be a new pattern in the codebase.

I feel like I am discussing hypotheticals so honestly I am not married to either approach, but just thought I'd offer my 2 cents since @xudong963 asked 😅 Anyway this probably warrants further discussion on an issue.

Comment on lines 198 to 221
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(vec![self.statistics()?])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing as CoalesceBatchesExec doesn't change partitioning of its input I think this is the incorrect number of partitions? It should be repeated N times, once for each partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find. I was misled by the coalesce : )

Comment on lines 434 to 437
Ok(vec![
Statistics::new_unknown(&self.schema());
self.properties().partitioning.partition_count()
])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I had considered repeating the global statistics for each partition, but I am not sure if this is correct. At a minimum we would need to relax any exact statistics, and I guess we would also need to consider how the node's partitioning could affect how data is distributed.

I guess it is safe no matter what to return unknown statistics and there aren't any real benefits yet to trying to implement a better default. Curious if you have any thoughts to add.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the original default implementation from you; it's a bit risky if a node doesn't have a specific implementation for the API, and I also think most of the nodes' partition statistics don't follow the "repeating the global statistics for each partition". So out of safety, I change it to unknown statistics.

Comment on lines 112 to 207
async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()>
{
let scan = generate_listing_table_with_statistics(Some(2)).await;
let statistics = scan.statistics_by_partition()?;
let expected_statistic_partition_1 =
create_partition_statistics(2, 110, 3, 4, true);
let expected_statistic_partition_2 =
create_partition_statistics(2, 110, 1, 2, true);
// Check the statistics of each partition
assert_eq!(statistics.len(), 2);
assert_eq!(statistics[0], expected_statistic_partition_1);
assert_eq!(statistics[1], expected_statistic_partition_2);
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding a function that runs all partitions of an ExecutionPlan (using execute_stream_partitioned) and checks if the min/max/etc. of each partition actually is consistent with the statistics_by_partition? It would be a useful way to check that the implementation matches what statistics_by_partition predicts, and I think we should be able to write a single function to check this in all cases here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea makes a lot of sense, thank you!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in d94c149, now the tests look very promising!

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @xudong963 for this big effort. I've some suggestions, and I'm sure we will find the best design after some iterations

{
for (idx, file_group) in file_config.file_groups.iter().enumerate() {
if let Some(stat) = file_group.statistics() {
statistics[idx] = stat.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should fix this as soon as possible, since the Statistics part of the codebase is under heavy focus at these moments by many people, and in the near future, I expect that we will have many Statistics related PR's and developments. So, to not bring an inherent regression, we should bring the infra to a safely extensible state.

{
for (idx, file_group) in file_config.file_groups.iter().enumerate() {
if let Some(stat) = file_group.statistics() {
statistics[idx] = stat.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping the Statistics with Arc<>'s can be a solution?

Some structs like FileGroup, PartitionedData etc. caches the Statistics. So, if the source operators can access those, they should return over them. However, for other intermediate operators, perhaps we can utilize PlanProperties? The Statistics will be initiated once and cached like other planning properties

/// Returns statistics for each partition of this `ExecutionPlan` node.
/// If statistics are not available, returns an array of
/// [`Statistics::new_unknown`] for each partition.
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still on the side of unifying these two API's. Maybe you can have a better proposal, but those are what is in my mind now:

If we don't prefer dealing with new enum's or structs for Statistics, we can modify the API such:

fn statistics(&self, partition: Option<usize>) -> Result<Statistics>

This would give the option of not implementing all operators and express clearly what we have, and get all stuff closer.

If we prefer using the same API, but enriching the Statistics definition, then:

  1. Rename Statistics as TableStatistics in the statistics() API.
struct TableStatistics {
    first_partition: Statistics
    others: Vec<Statistics>
}

This option requires some methods for ease of use for TableStatistics as you imagine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative is completely separating table statistics from partition statistics, if their methods are like more distinct, and not used commonly very often

@xudong963
Copy link
Member Author

Thanks for your thoughts about unifying the two APIs @berkaysynnada @suremarc.

As @alamb said, how about we proceed with a new API and maybe plan some future work to unify them?

To be honest, I don't have a very strong preference for these thoughts. I wanna open a new issue to discuss how to unify them. Currently, we have three different thoughts, and I hope we can attract more users to participate in the discussion. Then, we can make the final decision from the developers' and users' combined perspectives.

I believe once we decide the unified way, the real unified work is so easy: just move the current separated API code to the new one, and change the API called from the tests.

What do you think?

@berkaysynnada
Copy link
Contributor

berkaysynnada commented Apr 8, 2025

What do you think?

I'm still thinking we should unify the API's. We've discussed the issue with our team, and got a common opinion:

If I give an example of my concerns, there will be many duplications like this:

    fn statistics(&self) -> Result<Statistics> {
        let stats = Self::statistics_helper(
            self.schema(),
            self.input().statistics()?,
            self.predicate(),
            self.default_selectivity,
        )?;
        Ok(stats.project(self.projection.as_ref()))
    }

    fn statistics_by_partition(&self) -> Result<PartitionedStatistics> {
        let input_stats = self.input.statistics_by_partition()?;

        let stats: Result<Vec<Arc<Statistics>>> = input_stats
            .iter()
            .map(|stat| {
                let stat = Self::statistics_helper(
                    self.schema(),
                    stat.clone(),
                    self.predicate(),
                    self.default_selectivity,
                )
                .map(|stat| stat.project(self.projection.as_ref()))?;
                Ok(Arc::new(stat))
            })
            .collect();

        Ok(PartitionedStatistics::new(stats?))
    }

There are/will be duplications of statistics() logics in each operator like this, because the calculations are the same, whether the stats are coming from the whole table or just for one partition. We can avoid the duplications and write efficient functional statistics() implementations if we adopt

fn statistics(&self, partition: Option<usize>) -> Result<Statistics>

style. So, that alternative wins clearly for me against other alternatives. It also does not modify the other existing structs/API's, and propose an extensible way while enabling the statistics access over any partition.

TLDR, updating the API as fn statistics(&self, partition: Option<usize>) -> Result<Statistics> has a minimal change, doesn't force us to follow an immature design path, reduce duplications, and enables partition-based stats access, that's the main goal.

@xudong963
Copy link
Member Author

fn statistics(&self, partition: Option<usize>) -> Result<Statistics>

Thanks @berkaysynnada ! I'm a little confused about the API, the original statistics_by_partition is to collect all partitions' statistics. IIUC, the new statistics API works like this:

fn statistics(&self, partition: Option<usize>) -> Result<Statistics> {
    match partition {
        Some(idx) => {
            // Validate partition index
            if idx >= self.properties().partitioning.partition_count() {
                return exec_err!("Invalid partition index: {}", idx);
            }
            // Default implementation: return unknown statistics for the specific partition
            Ok(Statistics::new_unknown(&self.schema()))
        }
        None => {
            // Return statistics for the entire plan (existing behavior)
            Ok(Statistics::new_unknown(&self.schema()))
        }
    }
}

How does it return all partitions' statistics?

@alamb
Copy link
Contributor

alamb commented Apr 8, 2025

How does it return all partitions' statistics?

I think the idea like

let all_partition_statistics = plan.statistics(None);

// get only statistics for partition 3
let partition_statistics = plan.statistics(Some(3));

@xudong963
Copy link
Member Author

xudong963 commented Apr 19, 2025

I may start a new branch based on the branch to experiment with @berkaysynnada's suggestion to see if there are some challenges, then we can decide the next direction. /cc @alamb @suremarc @wiedld (Hope we can make the optimized SPM cross the finish line and include it in DF48 🚀 )

@berkaysynnada
Copy link
Contributor

I may start a new branch based on the branch to experiment with @berkaysynnada's suggestion to see if there are some challenges, then we can decide the next direction. /cc @alamb @suremarc @wiedld (Hope we can make the optimized SPM cross the finish line and include it in DF48 🚀 )

Sounds good @xudong963. I'm looking forward to see your findings

@xudong963 xudong963 force-pushed the statistic_per_partition_api branch from e122df0 to 0ba5ef5 Compare April 25, 2025 05:58
@alamb
Copy link
Contributor

alamb commented Apr 29, 2025

I believe this is superceded by #15852 so marking as a draft

@alamb alamb marked this pull request as draft April 29, 2025 01:35
@xudong963 xudong963 closed this Apr 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add statistics_by_partition API to ExecutionPlan
5 participants