Skip to content

Fix: after repartitioning, the PartitionedFile and FileGroup statistics should be inexact/recomputed #15539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,12 +1127,12 @@ impl ListingTable {
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let file_groups = file_group.split_files(self.options.target_partitions);
compute_all_files_statistics(
Ok(compute_all_files_statistics(
file_groups,
self.schema(),
self.options.collect_stat,
inexact_stats,
)
))
}

/// Collects statistics for a given partitioned file.
Expand Down
161 changes: 158 additions & 3 deletions datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Logic for managing groups of [`PartitionedFile`]s in DataFusion

use crate::statistics::compute_file_group_statistics;
use crate::{FileRange, PartitionedFile};
use datafusion_common::Statistics;
use itertools::Itertools;
Expand Down Expand Up @@ -199,11 +200,20 @@ impl FileGroupPartitioner {
}

// special case when order must be preserved
if self.preserve_order_within_groups {
let repartitioned_groups = if self.preserve_order_within_groups {
self.repartition_preserving_order(file_groups)
} else {
self.repartition_evenly_by_size(file_groups)
};

let repartitioned_groups = repartitioned_groups?;

// Recompute statistics for each file group
let mut groups = Vec::with_capacity(repartitioned_groups.len());
for file_group in repartitioned_groups {
groups.push(compute_file_group_statistics(file_group, true));
}
Some(groups)
}

/// Evenly repartition files across partitions by size, ignoring any
Expand Down Expand Up @@ -351,8 +361,18 @@ impl FileGroupPartitioner {
if i == last_group {
range_end = file_size as i64;
}
target_group
.push(original_file.clone().with_range(range_start, range_end));
let updated_file =
original_file.clone().with_range(range_start, range_end);
let statistics_option = updated_file
.statistics
.as_ref()
.map(|stat| Arc::new(stat.as_ref().clone().to_inexact()));

if let Some(statistics) = statistics_option {
target_group.push(updated_file.with_statistics(statistics));
} else {
target_group.push(updated_file);
}
range_start = range_end;
range_end += range_size;
}
Expand Down Expand Up @@ -525,6 +545,9 @@ impl Ord for ToRepartition {
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;
use std::sync::Arc;

/// Empty file won't get partitioned
#[test]
Expand Down Expand Up @@ -941,6 +964,138 @@ mod test {
assert_partitioned_files(expected, actual);
}

#[test]
fn repartition_file_groups_with_statistics() -> datafusion_common::Result<()> {
// Create test files
let mut file1 = pfile("a", 100);
let mut file2 = pfile("b", 50);

// Create statistics for file groups
let stats1 = Statistics {
num_rows: Precision::Exact(1000),
total_byte_size: Precision::Exact(100),
column_statistics: vec![
// Just add column statistics for a couple columns
datafusion_common::ColumnStatistics {
null_count: Precision::Exact(10),
max_value: Precision::Exact(ScalarValue::UInt32(Some(100))),
min_value: Precision::Exact(ScalarValue::UInt32(Some(1))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
],
};

file1 = file1.with_statistics(Arc::new(stats1.clone()));

let stats2 = Statistics {
num_rows: Precision::Exact(500),
total_byte_size: Precision::Exact(50),
column_statistics: vec![
// Just add column statistics for a couple columns
datafusion_common::ColumnStatistics {
null_count: Precision::Exact(5),
max_value: Precision::Exact(ScalarValue::UInt32(Some(200))),
min_value: Precision::Exact(ScalarValue::UInt32(Some(101))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
],
};

file2 = file2.with_statistics(Arc::new(stats2.clone()));

let file_groups = vec![
FileGroup::new(vec![file1]).with_statistics(Arc::new(stats1)),
FileGroup::new(vec![file2]).with_statistics(Arc::new(stats2)),
];

// Verify initial state
assert!(file_groups[0].statistics().is_some());
assert!(file_groups[1].statistics().is_some());

// Repartition files
let repartitioned = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&file_groups)
.unwrap();

// Verify statistics are present and valid
assert_eq!(repartitioned.len(), 3, "Should have 3 partitions");

// Helper function to check statistics are inexact
fn assert_stats_are_inexact(stats: &Statistics) {
assert!(!stats.num_rows.is_exact().unwrap());
assert!(!stats.total_byte_size.is_exact().unwrap());
assert!(!stats.column_statistics[0].max_value.is_exact().unwrap());
}

for group in repartitioned.iter() {
// Check all files have inexact statistics regardless of group
for file in group.files.iter() {
let stats = file.statistics.as_ref().unwrap();
assert_stats_are_inexact(stats);
}

let stats = group.statistics.as_ref().unwrap();
assert_stats_are_inexact(stats);
}

// Check the specific statistics for each group (after repartition, each group only has one file, so we don't need to check the partitioned file statistics)
let expected_group_1_statistics = Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(100),
column_statistics: vec![datafusion_common::ColumnStatistics {
null_count: Precision::Inexact(10),
max_value: Precision::Inexact(ScalarValue::UInt32(Some(100))),
min_value: Precision::Inexact(ScalarValue::UInt32(Some(1))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
}],
};

let expected_group_2_statistics = Statistics {
num_rows: Precision::Inexact(500),
total_byte_size: Precision::Inexact(50),
column_statistics: vec![datafusion_common::ColumnStatistics {
null_count: Precision::Inexact(5),
max_value: Precision::Inexact(ScalarValue::UInt32(Some(200))),
min_value: Precision::Inexact(ScalarValue::UInt32(Some(101))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
}],
};

let expected_group_3_statistics = Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(100),
column_statistics: vec![datafusion_common::ColumnStatistics {
null_count: Precision::Inexact(10),
max_value: Precision::Inexact(ScalarValue::UInt32(Some(100))),
min_value: Precision::Inexact(ScalarValue::UInt32(Some(1))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
}],
};

assert_eq!(
repartitioned[0].statistics.as_ref().unwrap(),
&Arc::new(expected_group_1_statistics)
);
assert_eq!(
repartitioned[1].statistics.as_ref().unwrap(),
&Arc::new(expected_group_2_statistics)
);
assert_eq!(
repartitioned[2].statistics.as_ref().unwrap(),
&Arc::new(expected_group_3_statistics)
);

Ok(())
}

/// Asserts that the two groups of [`PartitionedFile`] are the same
/// (PartitionedFile doesn't implement PartialEq)
fn assert_partitioned_files(
Expand Down
Loading