Skip to content

Commit d94c149

Browse files
committed
Check the statistics_by_partition with real results
1 parent 6ee5efe commit d94c149

File tree

1 file changed

+177
-14
lines changed

1 file changed

+177
-14
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 177 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,30 @@
1717

1818
#[cfg(test)]
1919
mod test {
20+
use arrow::array::{Int32Array, RecordBatch};
2021
use arrow_schema::{DataType, Field, Schema, SortOptions};
2122
use datafusion::datasource::listing::ListingTable;
2223
use datafusion::prelude::SessionContext;
2324
use datafusion_catalog::TableProvider;
2425
use datafusion_common::stats::Precision;
26+
use datafusion_common::Result;
2527
use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
2628
use datafusion_execution::config::SessionConfig;
29+
use datafusion_execution::TaskContext;
2730
use datafusion_expr_common::operator::Operator;
2831
use datafusion_physical_expr::expressions::{binary, lit, Column};
2932
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3033
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
34+
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
35+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
3136
use datafusion_physical_plan::filter::FilterExec;
3237
use datafusion_physical_plan::joins::CrossJoinExec;
38+
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
3339
use datafusion_physical_plan::projection::ProjectionExec;
3440
use datafusion_physical_plan::sorts::sort::SortExec;
3541
use datafusion_physical_plan::union::UnionExec;
36-
use datafusion_physical_plan::ExecutionPlan;
42+
use datafusion_physical_plan::{execute_stream_partitioned, ExecutionPlan};
43+
use futures::TryStreamExt;
3744
use std::sync::Arc;
3845

3946
/// Creates a test table with statistics from the test data directory.
@@ -121,9 +128,63 @@ mod test {
121128
}
122129
}
123130

131+
/// Helper function to validate that statistics from statistics_by_partition match the actual data
132+
async fn validate_statistics_with_data(
133+
plan: Arc<dyn ExecutionPlan>,
134+
expected_stats: Vec<(i32, i32, usize)>, // (min_id, max_id, row_count)
135+
id_column_index: usize,
136+
) -> Result<()> {
137+
let ctx = TaskContext::default();
138+
let partitions = execute_stream_partitioned(plan, Arc::new(ctx))?;
139+
140+
let mut actual_stats = Vec::new();
141+
for partition_stream in partitions.into_iter() {
142+
let result: Vec<RecordBatch> = partition_stream.try_collect().await?;
143+
144+
let mut min_id = i32::MAX;
145+
let mut max_id = i32::MIN;
146+
let mut row_count = 0;
147+
148+
for batch in result {
149+
if batch.num_columns() > id_column_index {
150+
let id_array = batch
151+
.column(id_column_index)
152+
.as_any()
153+
.downcast_ref::<Int32Array>()
154+
.unwrap();
155+
for i in 0..batch.num_rows() {
156+
let id_value = id_array.value(i);
157+
min_id = min_id.min(id_value);
158+
max_id = max_id.max(id_value);
159+
row_count += 1;
160+
}
161+
}
162+
}
163+
164+
if row_count > 0 {
165+
actual_stats.push((min_id, max_id, row_count));
166+
}
167+
}
168+
169+
// Compare actual data with expected statistics
170+
assert_eq!(
171+
actual_stats.len(),
172+
expected_stats.len(),
173+
"Number of partitions with data doesn't match expected"
174+
);
175+
for i in 0..actual_stats.len() {
176+
assert_eq!(
177+
actual_stats[i], expected_stats[i],
178+
"Partition {} data doesn't match statistics",
179+
i
180+
);
181+
}
182+
183+
Ok(())
184+
}
185+
124186
#[tokio::test]
125-
async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()>
126-
{
187+
async fn test_statistics_by_partition_of_data_source() -> Result<()> {
127188
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
128189
let statistics = scan.statistics_by_partition()?;
129190
let expected_statistic_partition_1 =
@@ -134,12 +195,19 @@ mod test {
134195
assert_eq!(statistics.len(), 2);
135196
assert_eq!(statistics[0], expected_statistic_partition_1);
136197
assert_eq!(statistics[1], expected_statistic_partition_2);
198+
199+
// Check the statistics_by_partition with real results
200+
let expected_stats = vec![
201+
(3, 4, 2), // (min_id, max_id, row_count) for first partition
202+
(1, 2, 2), // (min_id, max_id, row_count) for second partition
203+
];
204+
validate_statistics_with_data(scan, expected_stats, 0).await?;
205+
137206
Ok(())
138207
}
139208

140209
#[tokio::test]
141-
async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()>
142-
{
210+
async fn test_statistics_by_partition_of_projection() -> Result<()> {
143211
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
144212
// Add projection execution plan
145213
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
@@ -154,12 +222,16 @@ mod test {
154222
assert_eq!(statistics.len(), 2);
155223
assert_eq!(statistics[0], expected_statistic_partition_1);
156224
assert_eq!(statistics[1], expected_statistic_partition_2);
225+
226+
// Check the statistics_by_partition with real results
227+
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
228+
validate_statistics_with_data(Arc::new(projection), expected_stats, 0).await?;
157229
Ok(())
158230
}
159231

160232
#[tokio::test]
161-
async fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> {
162-
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
233+
async fn test_statistics_by_partition_of_sort() -> Result<()> {
234+
let scan_1 = create_scan_exec_with_statistics(None, Some(1)).await;
163235
// Add sort execution plan
164236
let sort = SortExec::new(
165237
LexOrdering::new(vec![PhysicalSortExpr {
@@ -169,16 +241,34 @@ mod test {
169241
nulls_first: false,
170242
},
171243
}]),
172-
scan,
244+
scan_1,
173245
);
174-
let mut sort_exec = Arc::new(sort.clone());
246+
let sort_exec = Arc::new(sort.clone());
175247
let statistics = sort_exec.statistics_by_partition()?;
176248
let expected_statistic_partition =
177249
create_partition_statistics(4, 220, 1, 4, true);
178250
assert_eq!(statistics.len(), 1);
179251
assert_eq!(statistics[0], expected_statistic_partition);
252+
// Check the statistics_by_partition with real results
253+
let expected_stats = vec![(1, 4, 4)];
254+
validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?;
180255

181-
sort_exec = Arc::new(sort.with_preserve_partitioning(true));
256+
// Sort with preserve_partitioning
257+
let scan_2 = create_scan_exec_with_statistics(None, Some(2)).await;
258+
// Add sort execution plan
259+
let sort_exec = Arc::new(
260+
SortExec::new(
261+
LexOrdering::new(vec![PhysicalSortExpr {
262+
expr: Arc::new(Column::new("id", 0)),
263+
options: SortOptions {
264+
descending: false,
265+
nulls_first: false,
266+
},
267+
}]),
268+
scan_2,
269+
)
270+
.with_preserve_partitioning(true),
271+
);
182272
let expected_statistic_partition_1 =
183273
create_partition_statistics(2, 110, 3, 4, true);
184274
let expected_statistic_partition_2 =
@@ -187,11 +277,15 @@ mod test {
187277
assert_eq!(statistics.len(), 2);
188278
assert_eq!(statistics[0], expected_statistic_partition_1);
189279
assert_eq!(statistics[1], expected_statistic_partition_2);
280+
281+
// Check the statistics_by_partition with real results
282+
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
283+
validate_statistics_with_data(sort_exec, expected_stats, 0).await?;
190284
Ok(())
191285
}
192286

193287
#[tokio::test]
194-
async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> {
288+
async fn test_statistics_by_partition_of_filter() -> Result<()> {
195289
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
196290
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
197291
let predicate = binary(
@@ -233,7 +327,7 @@ mod test {
233327
}
234328

235329
#[tokio::test]
236-
async fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> {
330+
async fn test_statistic_by_partition_of_union() -> Result<()> {
237331
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
238332
let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan]));
239333
let statistics = union_exec.statistics_by_partition()?;
@@ -252,12 +346,14 @@ mod test {
252346
// Verify fourth partition (from second scan - same as second partition)
253347
assert_eq!(statistics[3], expected_statistic_partition_2);
254348

349+
// Check the statistics_by_partition with real results
350+
let expected_stats = vec![(3, 4, 2), (1, 2, 2), (3, 4, 2), (1, 2, 2)];
351+
validate_statistics_with_data(union_exec, expected_stats, 0).await?;
255352
Ok(())
256353
}
257354

258355
#[tokio::test]
259-
async fn test_statistic_by_partition_of_cross_join() -> datafusion_common::Result<()>
260-
{
356+
async fn test_statistic_by_partition_of_cross_join() -> Result<()> {
261357
let left_scan = create_scan_exec_with_statistics(None, Some(2)).await;
262358
let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL) \
263359
STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\
@@ -292,6 +388,73 @@ mod test {
292388
});
293389
assert_eq!(statistics[0], expected_statistic_partition_1);
294390
assert_eq!(statistics[1], expected_statistic_partition_2);
391+
392+
// Check the statistics_by_partition with real results
393+
let expected_stats = vec![(1, 4, 8), (1, 4, 8)];
394+
validate_statistics_with_data(Arc::new(cross_join), expected_stats, 0).await?;
395+
Ok(())
396+
}
397+
398+
#[tokio::test]
399+
async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> {
400+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
401+
let coalesce_batches = CoalesceBatchesExec::new(scan, 2);
402+
let expected_statistic_partition_1 =
403+
create_partition_statistics(2, 110, 3, 4, true);
404+
let expected_statistic_partition_2 =
405+
create_partition_statistics(2, 110, 1, 2, true);
406+
let statistics = coalesce_batches.statistics_by_partition()?;
407+
assert_eq!(statistics.len(), 2);
408+
assert_eq!(statistics[0], expected_statistic_partition_1);
409+
assert_eq!(statistics[1], expected_statistic_partition_2);
410+
411+
// Check the statistics_by_partition with real results
412+
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
413+
validate_statistics_with_data(Arc::new(coalesce_batches), expected_stats, 0)
414+
.await?;
415+
Ok(())
416+
}
417+
418+
#[tokio::test]
419+
async fn test_statistic_by_partition_of_coalesce_partitions() -> Result<()> {
420+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
421+
let coalesce_partitions = CoalescePartitionsExec::new(scan);
422+
let expected_statistic_partition =
423+
create_partition_statistics(4, 220, 1, 4, true);
424+
let statistics = coalesce_partitions.statistics_by_partition()?;
425+
assert_eq!(statistics.len(), 1);
426+
assert_eq!(statistics[0], expected_statistic_partition);
427+
428+
// Check the statistics_by_partition with real results
429+
let expected_stats = vec![(1, 4, 4)];
430+
validate_statistics_with_data(Arc::new(coalesce_partitions), expected_stats, 0)
431+
.await?;
432+
Ok(())
433+
}
434+
435+
#[tokio::test]
436+
async fn test_statistic_by_partition_of_local_limit() -> Result<()> {
437+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
438+
let local_limit = LocalLimitExec::new(scan.clone(), 1);
439+
let statistics = local_limit.statistics_by_partition()?;
440+
assert_eq!(statistics.len(), 2);
441+
let schema = scan.schema();
442+
let mut expected_statistic_partition = Statistics::new_unknown(&schema);
443+
expected_statistic_partition.num_rows = Precision::Exact(1);
444+
assert_eq!(statistics[0], expected_statistic_partition);
445+
assert_eq!(statistics[1], expected_statistic_partition);
446+
Ok(())
447+
}
448+
449+
#[tokio::test]
450+
async fn test_statistic_by_partition_of_global_limit_partitions() -> Result<()> {
451+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
452+
let global_limit = GlobalLimitExec::new(scan.clone(), 0, Some(2));
453+
let statistics = global_limit.statistics_by_partition()?;
454+
assert_eq!(statistics.len(), 1);
455+
let mut expected_statistic_partition = Statistics::new_unknown(&scan.schema());
456+
expected_statistic_partition.num_rows = Precision::Exact(2);
457+
assert_eq!(statistics[0], expected_statistic_partition);
295458
Ok(())
296459
}
297460
}

0 commit comments

Comments
 (0)