Skip to content

Fix: after repartitioning, the PartitionedFile and FileGroup statistics should be inexact/recomputed #15539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

xudong963
Copy link
Member

Which issue does this PR close?

When I wrote tests for #15503, I found the bug.

Rationale for this change

After repartition, the PartitionedFile and FileGroup statistics should be inexact.

Or users may be based on inexact statistics to do some exact things. It's dangerous!

What changes are included in this PR?

Are these changes tested?

Yes, by UT, the added UT will report error in main branch

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate datasource Changes to the datasource crate labels Apr 2, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a bug where file statistics remain exact after repartitioning by converting them to inexact statistics. Key changes include:

  • Exposing and refactoring compute_summary_statistics to remove the file_schema parameter and infer columns from the first item.
  • Updating compute_file_group_statistics and compute_all_files_statistics to work with the new compute_summary_statistics signature.
  • Adjusting repartition logic in file_groups.rs to convert individual file and group statistics to inexact.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
datafusion/datasource/src/statistics.rs Refactored compute_summary_statistics and updated related UTs to remove the file_schema parameter.
datafusion/datasource/src/file_groups.rs Modified repartitioning logic to convert statistics to inexact when updating files and file groups.
datafusion/core/src/datasource/listing/table.rs Adjusted the call to compute_all_files_statistics to remove the schema argument.
Comments suppressed due to low confidence (3)

datafusion/datasource/src/statistics.rs:413

  • Since compute_summary_statistics is now public, please consider updating its documentation to clarify its intended usage and the assumption of inferring the number of columns from the first item.
pub fn compute_summary_statistics<T, I>(

datafusion/core/src/datasource/listing/table.rs:1134

  • Confirm that removing self.schema() from compute_all_files_statistics is intentional, as this affects the way summary statistics are computed by relying solely on data from the file groups.
compute_all_files_statistics(file_groups, self.options.collect_stat, inexact_stats,)

datafusion/datasource/src/file_groups.rs:379

  • [nitpick] Ensure that updating the group statistics to inexact here properly reflects the aggregation logic; if not, revisiting the computation logic may be necessary.
if let Some(statistics) = target_group.statistics.as_mut() {

@github-actions github-actions bot removed the core Core DataFusion crate label Apr 2, 2025
@xudong963 xudong963 requested a review from alamb April 2, 2025 14:38
@alamb
Copy link
Contributor

alamb commented Apr 3, 2025

I don't understand why repartitioning would change the statistics of the (overall) output of an ExecutionPlan

Can you provide an example plan / input where repartitioning the output would change the statistics?

I do understand why the per partition statistics would change, but not the overall plan's statistics

@xudong963
Copy link
Member Author

xudong963 commented Apr 3, 2025

I don't understand why repartitioning would change the statistics of the (overall) output of an ExecutionPlan

Sorry for the confusion. The PR title is inaccurate, as the PR summary says, "after repartition, the PartitionedFile and FileGroup's statistics should be inexact".

the statistics of the (overall) output of an ExecutionPlan won't be changed.

@xudong963 xudong963 changed the title Fix: after repartitioning, statistics should be inexact Fix: after repartitioning, the PartitionedFile and FileGroup statistics should be inexact Apr 3, 2025
@@ -263,7 +264,21 @@ impl FileGroupPartitioner {
.flatten()
.chunk_by(|(partition_idx, _)| *partition_idx)
.into_iter()
.map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
.map(|(_, group)| {
FileGroup::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it posisble to recompute the overall FileGroup's statistics rather than just marking it as inexact? If we have per-file statistics we should be able to recompute each group's statistics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Calling repartitioned() from DataSource once would make us lose the all information, but you're right it's also very dangerous right now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can. But need to do some refactors for the compute_file_group_statistics method.

It's difficult to get file_schema here, and luckily, the compute_file_group_statistics method doesn't necessarily need the argument.

736a029

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the challenging part. We should just track where file chunks goes and write a logic like in the compute_summary_statistics() on the same groups

@github-actions github-actions bot added the core Core DataFusion crate label Apr 7, 2025
@xudong963 xudong963 changed the title Fix: after repartitioning, the PartitionedFile and FileGroup statistics should be inexact Fix: after repartitioning, the PartitionedFile and FileGroup statistics should be inexact/recomputed Apr 8, 2025
@@ -410,23 +410,24 @@ pub async fn get_statistics_with_limit(
}

/// Generic function to compute statistics across multiple items that have statistics
fn compute_summary_statistics<T, I>(
/// If `items` is empty or all items don't have statistics, it returns `None`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reason of changes here. We already have a Statistics representation for unknown things. Why Option<>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Statistics::unknow() requires schema(more spefically, the size of fields) to initial the columns statistics, but sometimes, we don't have the schema info, so to make the method more general, I change the return value to None if we can't get the schema.

Fyi, I did the change in 1922c7e#diff-d91fad8ab007c2f14c53e4730c70db0b29818d94c98425e4f44aff01f6950957R414

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't see when we don't have the schema. If you don't have a schema, how can you even try to compute a statistics, I couldn't imagine also that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we compute the repartitioned stats properly in repartition_preserving_order and repartition_evenly_by_size, we don't need this recompute anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how we can avoid the recomputation for FileGroup statistics except by making it inexact directly. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't have a schema, how can you even try to compute a statistics, I couldn't imagine also that

For the compute_summary_statistics method, it does only summarize, it's not necessary to have the schema.

For the caller of compute_summary_statistics, such as compute_file_group_statistics, if compute_summary_statistics returns none, it doesn't need to do anything, because the default value of the statistics of FileGroup is None

pub struct FileGroup {
    /// The files in this group
    files: Vec<PartitionedFile>,
    /// Optional statistics for the data across all files in the group
    statistics: Option<Arc<Statistics>>,
}

It is used as a base method, and its caller has the flexibility to treat it according to its return value, without restricting it too much (e.g., requiring a shcema)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was this: when we call repartition_file_groups(), the file groups have the exact statistics if the sources provide so. While repartitioning them by preserving order or according to the size, we can know which PartitionedFile goes which group at the end. For example, lets say FileA has 2 chunks(min1: 0 max1: 10, min2: 15 max2: 20)and FileB has one(min1: 7 max1: 17). During the repartitioning, we can observe how these files are repartitioned, and let's say the final state is Partition1: [FileA/chunk1 + FileB] Partition2: [FileA/chunk2]. Then, we can know that these partitions have these exact statistics:
Partition1: [min:0 max:17] and Partition2: [min: 15 max:20]. We don't need to recompute these.

Am I missing some points?

pub fn compute_file_group_statistics(
file_group: FileGroup,
file_schema: SchemaRef,
mut file_group: FileGroup,
collect_stats: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this collect_stats argument is strange, isn't it? We can check it from the caller side and not call the function if it is not intended at all?

@@ -519,29 +526,30 @@ pub fn compute_all_files_statistics(
file_schema: SchemaRef,
collect_stats: bool,
inexact_stats: bool,
) -> Result<(Vec<FileGroup>, Statistics)> {
) -> (Vec<FileGroup>, Statistics) {
if !collect_stats {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other strange part. compute_summary_statistics() is called from 2 places: one of them calls it as true, and the other place is here. But here we also know that it cannot be false

items: I,
file_schema: &SchemaRef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The files in compute_summary_statistics() do all have the same schema? Why do you say that sometimes we don't have the schema?

@@ -410,23 +410,24 @@ pub async fn get_statistics_with_limit(
}

/// Generic function to compute statistics across multiple items that have statistics
fn compute_summary_statistics<T, I>(
/// If `items` is empty or all items don't have statistics, it returns `None`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was this: when we call repartition_file_groups(), the file groups have the exact statistics if the sources provide so. While repartitioning them by preserving order or according to the size, we can know which PartitionedFile goes which group at the end. For example, lets say FileA has 2 chunks(min1: 0 max1: 10, min2: 15 max2: 20)and FileB has one(min1: 7 max1: 17). During the repartitioning, we can observe how these files are repartitioned, and let's say the final state is Partition1: [FileA/chunk1 + FileB] Partition2: [FileA/chunk2]. Then, we can know that these partitions have these exact statistics:
Partition1: [min:0 max:17] and Partition2: [min: 15 max:20]. We don't need to recompute these.

Am I missing some points?

@xudong963
Copy link
Member Author

@berkaysynnada Thanks for your review, i'll continue it this week.

@alamb
Copy link
Contributor

alamb commented May 7, 2025

Marking as draft as I think this PR is no longer waiting on feedback and I am trying to make it easier to find PRs in need of review. Please mark it as ready for review when it is ready for another look

@alamb alamb marked this pull request as draft May 7, 2025 20:51
@xudong963
Copy link
Member Author

Let's close the one, I'll open a new PR to solve this, because we have made many changes about how to merge stats

@xudong963 xudong963 closed this May 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants