Skip to content

Commit ac5ae71

Browse files
committed
add tests
1 parent 979056c commit ac5ae71

File tree

7 files changed

+278
-69
lines changed

7 files changed

+278
-69
lines changed
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ impl ListingOptions {
713713
/// # Ok(())
714714
/// # }
715715
/// ```
716-
#[derive(Debug)]
716+
#[derive(Debug, Clone)]
717717
pub struct ListingTable {
718718
table_paths: Vec<ListingTableUrl>,
719719
/// File fields only

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ mod enforce_sorting;
2424
mod join_selection;
2525
mod limit_pushdown;
2626
mod limited_distinct_aggregation;
27+
mod partition_statistics;
2728
mod projection_pushdown;
2829
mod replace_with_order_preserving_variants;
2930
mod sanity_checker;
3031
mod test_utils;
31-
mod partition_statistics;

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 276 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,99 +17,308 @@
1717

1818
#[cfg(test)]
1919
mod test {
20-
use std::sync::Arc;
21-
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
22-
use datafusion::execution::SessionStateBuilder;
20+
use arrow_schema::{DataType, Field, Schema, SortOptions};
21+
use datafusion::datasource::listing::ListingTable;
2322
use datafusion::prelude::SessionContext;
2423
use datafusion_catalog::TableProvider;
25-
use datafusion_common::config::ConfigOptions;
26-
use datafusion_datasource::ListingTableUrl;
27-
use datafusion_datasource::source::DataSourceExec;
28-
use datafusion_datasource_parquet::ParquetFormat;
24+
use datafusion_common::stats::Precision;
25+
use datafusion_common::{ScalarValue, Statistics};
2926
use datafusion_execution::config::SessionConfig;
30-
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
27+
use datafusion_expr_common::operator::Operator;
28+
use datafusion_physical_expr::expressions::{binary, lit, Column};
29+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
30+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
31+
use datafusion_physical_plan::filter::FilterExec;
32+
use datafusion_physical_plan::projection::ProjectionExec;
33+
use datafusion_physical_plan::sorts::sort::SortExec;
34+
use datafusion_physical_plan::union::UnionExec;
3135
use datafusion_physical_plan::ExecutionPlan;
36+
use std::sync::Arc;
3237

33-
async fn generate_listing_table_with_statistics() -> Arc<dyn ExecutionPlan> {
34-
let testdata = datafusion::test_util::parquet_test_data();
35-
let filename = format!("{}/{}", testdata, "alltypes_tiny_pages.parquet");
36-
let table_path = ListingTableUrl::parse(filename).unwrap();
37-
let opt = ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
38-
let rt = RuntimeEnvBuilder::new()
39-
.build_arc()
40-
.expect("could not build runtime environment");
41-
42-
let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state();
43-
let schema = opt
44-
.infer_schema(
45-
&SessionStateBuilder::new().with_default_features().build(),
46-
&table_path,
47-
)
38+
async fn generate_listing_table_with_statistics(
39+
target_partition: Option<usize>,
40+
) -> Arc<dyn ExecutionPlan> {
41+
// Delete the existing data directory if it exists
42+
let data_dir = "./data/";
43+
let _ = std::fs::remove_dir_all(data_dir);
44+
let mut session_config = SessionConfig::new().with_collect_statistics(true);
45+
if let Some(partition) = target_partition {
46+
session_config = session_config.with_target_partitions(partition);
47+
}
48+
let ctx = SessionContext::new_with_config(session_config);
49+
// Create table with partition
50+
let create_table_sql = "CREATE EXTERNAL TABLE t1 (id INT not null, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC);";
51+
ctx.sql(create_table_sql)
52+
.await
53+
.unwrap()
54+
.collect()
4855
.await
4956
.unwrap();
50-
let config = ListingTableConfig::new(table_path.clone())
51-
.with_listing_options(opt.clone())
52-
.with_schema(schema);
53-
let table = ListingTable::try_new(config).unwrap();
54-
let res= table.scan(&state, None, &[], None).await.unwrap();
55-
dbg!(&res.statistics().unwrap());
56-
dbg!(&res.statistics_by_partition().unwrap());
57-
let mut config = ConfigOptions::new();
58-
config.set("datafusion.optimizer.repartition_file_min_size", "10").unwrap();
59-
let res = res.repartitioned(5, &config).unwrap().unwrap();
60-
dbg!(&res.statistics_by_partition().unwrap());
61-
res
57+
// Insert data into the table, will generate partition files with parquet format
58+
let insert_data = "INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04');";
59+
ctx.sql(insert_data).await.unwrap().collect().await.unwrap();
60+
let table = ctx.table_provider("t1").await.unwrap();
61+
let listing_table = table
62+
.as_any()
63+
.downcast_ref::<ListingTable>()
64+
.unwrap()
65+
.clone();
66+
listing_table
67+
.scan(&ctx.state(), None, &[], None)
68+
.await
69+
.unwrap()
6270
}
6371

64-
#[tokio::test]
65-
async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()> {
66-
generate_listing_table_with_statistics().await;
67-
Ok(())
68-
}
72+
fn check_unchanged_statistics(statistics: Vec<Statistics>) {
73+
// Check the statistics of each partition
74+
for stat in &statistics {
75+
assert_eq!(stat.num_rows, Precision::Exact(1));
76+
// First column (id) should have non-null values
77+
assert_eq!(stat.column_statistics[0].null_count, Precision::Exact(0));
78+
}
6979

70-
#[test]
71-
fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()> {
72-
Ok(())
80+
// Verify specific id values for each partition
81+
assert_eq!(
82+
statistics[0].column_statistics[0].max_value,
83+
Precision::Exact(ScalarValue::Int32(Some(4)))
84+
);
85+
assert_eq!(
86+
statistics[1].column_statistics[0].max_value,
87+
Precision::Exact(ScalarValue::Int32(Some(3)))
88+
);
89+
assert_eq!(
90+
statistics[2].column_statistics[0].max_value,
91+
Precision::Exact(ScalarValue::Int32(Some(2)))
92+
);
93+
assert_eq!(
94+
statistics[3].column_statistics[0].max_value,
95+
Precision::Exact(ScalarValue::Int32(Some(1)))
96+
);
7397
}
7498

75-
#[test]
76-
fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> {
99+
#[tokio::test]
100+
async fn test_statistics_by_partition_of_data_source() -> datafusion_common::Result<()>
101+
{
102+
let scan = generate_listing_table_with_statistics(None).await;
103+
let statistics = scan.statistics_by_partition()?;
104+
// Check the statistics of each partition
105+
assert_eq!(statistics.len(), 4);
106+
for stat in &statistics {
107+
assert_eq!(stat.column_statistics.len(), 2);
108+
assert_eq!(stat.total_byte_size, Precision::Exact(55));
109+
}
110+
check_unchanged_statistics(statistics);
77111
Ok(())
78112
}
79113

80-
#[test]
81-
fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> {
114+
#[tokio::test]
115+
async fn test_statistics_by_partition_of_projection() -> datafusion_common::Result<()>
116+
{
117+
let scan = generate_listing_table_with_statistics(None).await;
118+
// Add projection execution plan
119+
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
120+
vec![(Arc::new(Column::new("id", 0)), "id".to_string())];
121+
let projection = ProjectionExec::try_new(exprs, scan)?;
122+
let statistics = projection.statistics_by_partition()?;
123+
for stat in &statistics {
124+
assert_eq!(stat.column_statistics.len(), 1);
125+
assert_eq!(stat.total_byte_size, Precision::Exact(4));
126+
}
127+
check_unchanged_statistics(statistics);
82128
Ok(())
83129
}
84130

85-
#[test]
86-
fn test_statistics_by_partition_of_aggregate() -> datafusion_common::Result<()> {
131+
#[tokio::test]
132+
async fn test_statistics_by_partition_of_sort() -> datafusion_common::Result<()> {
133+
let scan = generate_listing_table_with_statistics(Some(2)).await;
134+
// Add sort execution plan
135+
let sort = SortExec::new(
136+
LexOrdering::new(vec![PhysicalSortExpr {
137+
expr: Arc::new(Column::new("id", 0)),
138+
options: SortOptions {
139+
descending: false,
140+
nulls_first: false,
141+
},
142+
}]),
143+
scan,
144+
);
145+
let mut sort_exec = Arc::new(sort.clone());
146+
let statistics = sort_exec.statistics_by_partition()?;
147+
assert_eq!(statistics.len(), 1);
148+
assert_eq!(statistics[0].num_rows, Precision::Exact(4));
149+
assert_eq!(statistics[0].column_statistics.len(), 2);
150+
assert_eq!(statistics[0].total_byte_size, Precision::Exact(220));
151+
assert_eq!(
152+
statistics[0].column_statistics[0].null_count,
153+
Precision::Exact(0)
154+
);
155+
assert_eq!(
156+
statistics[0].column_statistics[0].max_value,
157+
Precision::Exact(ScalarValue::Int32(Some(4)))
158+
);
159+
assert_eq!(
160+
statistics[0].column_statistics[0].min_value,
161+
Precision::Exact(ScalarValue::Int32(Some(1)))
162+
);
163+
sort_exec = Arc::new(sort.with_preserve_partitioning(true));
164+
let statistics = sort_exec.statistics_by_partition()?;
165+
dbg!(&statistics);
166+
assert_eq!(statistics.len(), 2);
167+
assert_eq!(statistics[0].num_rows, Precision::Exact(2));
168+
assert_eq!(statistics[1].num_rows, Precision::Exact(2));
169+
assert_eq!(statistics[0].column_statistics.len(), 2);
170+
assert_eq!(statistics[1].column_statistics.len(), 2);
171+
assert_eq!(statistics[0].total_byte_size, Precision::Exact(110));
172+
assert_eq!(statistics[1].total_byte_size, Precision::Exact(110));
173+
assert_eq!(
174+
statistics[0].column_statistics[0].null_count,
175+
Precision::Exact(0)
176+
);
177+
assert_eq!(
178+
statistics[0].column_statistics[0].max_value,
179+
Precision::Exact(ScalarValue::Int32(Some(4)))
180+
);
181+
assert_eq!(
182+
statistics[0].column_statistics[0].min_value,
183+
Precision::Exact(ScalarValue::Int32(Some(3)))
184+
);
185+
assert_eq!(
186+
statistics[1].column_statistics[0].null_count,
187+
Precision::Exact(0)
188+
);
189+
assert_eq!(
190+
statistics[1].column_statistics[0].max_value,
191+
Precision::Exact(ScalarValue::Int32(Some(2)))
192+
);
193+
assert_eq!(
194+
statistics[1].column_statistics[0].min_value,
195+
Precision::Exact(ScalarValue::Int32(Some(1)))
196+
);
87197
Ok(())
88198
}
89199

90-
#[test]
91-
fn test_statistic_by_partition_of_cross_join() -> datafusion_common::Result<()> {
200+
#[tokio::test]
201+
async fn test_statistics_by_partition_of_filter() -> datafusion_common::Result<()> {
202+
let scan = generate_listing_table_with_statistics(None).await;
203+
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
204+
let predicate = binary(
205+
Arc::new(Column::new("id", 0)),
206+
Operator::Lt,
207+
lit(1i32),
208+
&schema,
209+
)?;
210+
let filter: Arc<dyn ExecutionPlan> =
211+
Arc::new(FilterExec::try_new(predicate, scan)?);
212+
let _full_statistics = filter.statistics()?;
213+
// The full statistics is invalid, at least, we can improve the selectivity estimation of the filter
214+
/*
215+
Statistics {
216+
num_rows: Inexact(0),
217+
total_byte_size: Inexact(0),
218+
column_statistics: [
219+
ColumnStatistics {
220+
null_count: Exact(0),
221+
max_value: Exact(NULL),
222+
min_value: Exact(NULL),
223+
sum_value: Exact(NULL),
224+
distinct_count: Exact(0),
225+
},
226+
ColumnStatistics {
227+
null_count: Exact(0),
228+
max_value: Exact(NULL),
229+
min_value: Exact(NULL),
230+
sum_value: Exact(NULL),
231+
distinct_count: Exact(0),
232+
},
233+
],
234+
}
235+
*/
236+
let statistics = filter.statistics_by_partition()?;
237+
// Also the statistics of each partition is also invalid due to above
238+
// But we can ensure the current behavior by tests
239+
assert_eq!(statistics.len(), 4);
240+
for stat in &statistics {
241+
assert_eq!(stat.column_statistics.len(), 2);
242+
assert_eq!(stat.total_byte_size, Precision::Inexact(0));
243+
assert_eq!(stat.num_rows, Precision::Inexact(0));
244+
assert_eq!(stat.column_statistics[0].null_count, Precision::Exact(0));
245+
assert_eq!(
246+
stat.column_statistics[0].max_value,
247+
Precision::Exact(ScalarValue::Null)
248+
);
249+
assert_eq!(
250+
stat.column_statistics[0].min_value,
251+
Precision::Exact(ScalarValue::Null)
252+
);
253+
}
92254
Ok(())
93255
}
94256

95-
#[test]
96-
fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> {
97-
Ok(())
98-
}
257+
#[tokio::test]
258+
async fn test_statistic_by_partition_of_union() -> datafusion_common::Result<()> {
259+
let scan = generate_listing_table_with_statistics(Some(2)).await;
260+
let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan]));
261+
let statistics = union_exec.statistics_by_partition()?;
262+
// Check that we have 4 partitions (2 from each scan)
263+
assert_eq!(statistics.len(), 4);
99264

100-
#[test]
101-
fn test_statistic_by_partition_of_smp() -> datafusion_common::Result<()> {
102-
Ok(())
103-
}
265+
// Verify first partition (from first scan)
266+
assert_eq!(statistics[0].num_rows, Precision::Exact(2));
267+
assert_eq!(statistics[0].total_byte_size, Precision::Exact(110));
268+
assert_eq!(statistics[0].column_statistics.len(), 2);
269+
assert_eq!(
270+
statistics[0].column_statistics[0].null_count,
271+
Precision::Exact(0)
272+
);
273+
assert_eq!(
274+
statistics[0].column_statistics[0].max_value,
275+
Precision::Exact(ScalarValue::Int32(Some(4)))
276+
);
277+
assert_eq!(
278+
statistics[0].column_statistics[0].min_value,
279+
Precision::Exact(ScalarValue::Int32(Some(3)))
280+
);
104281

105-
#[test]
106-
fn test_statistic_by_partition_of_limit() -> datafusion_common::Result<()> {
107-
Ok(())
108-
}
282+
// Verify second partition (from first scan)
283+
assert_eq!(statistics[1].num_rows, Precision::Exact(2));
284+
assert_eq!(statistics[1].total_byte_size, Precision::Exact(110));
285+
assert_eq!(
286+
statistics[1].column_statistics[0].null_count,
287+
Precision::Exact(0)
288+
);
289+
assert_eq!(
290+
statistics[1].column_statistics[0].max_value,
291+
Precision::Exact(ScalarValue::Int32(Some(2)))
292+
);
293+
assert_eq!(
294+
statistics[1].column_statistics[0].min_value,
295+
Precision::Exact(ScalarValue::Int32(Some(1)))
296+
);
297+
298+
// Verify third partition (from second scan - same as first partition)
299+
assert_eq!(statistics[2].num_rows, Precision::Exact(2));
300+
assert_eq!(statistics[2].total_byte_size, Precision::Exact(110));
301+
assert_eq!(
302+
statistics[2].column_statistics[0].max_value,
303+
Precision::Exact(ScalarValue::Int32(Some(4)))
304+
);
305+
assert_eq!(
306+
statistics[2].column_statistics[0].min_value,
307+
Precision::Exact(ScalarValue::Int32(Some(3)))
308+
);
309+
310+
// Verify fourth partition (from second scan - same as second partition)
311+
assert_eq!(statistics[3].num_rows, Precision::Exact(2));
312+
assert_eq!(statistics[3].total_byte_size, Precision::Exact(110));
313+
assert_eq!(
314+
statistics[3].column_statistics[0].max_value,
315+
Precision::Exact(ScalarValue::Int32(Some(2)))
316+
);
317+
assert_eq!(
318+
statistics[3].column_statistics[0].min_value,
319+
Precision::Exact(ScalarValue::Int32(Some(1)))
320+
);
109321

110-
#[test]
111-
fn test_statistic_by_partition_of_coalesce() -> datafusion_common::Result<()> {
112322
Ok(())
113323
}
114-
115-
}
324+
}

0 commit comments

Comments
 (0)