Skip to content

Commit 324be53

Browse files
authored
Feat: introduce ExecutionPlan::partition_statistics API (#15852)
* save * save * save * functional way * fix sort * adding test * add tests * save * update * add PartitionedStatistics structure * use Arc * refine tests * save * resolve conflicts * use PartitionedStatistics * impl index and len for PartitionedStatistics * add test for cross join * fix clippy * Check the statistics_by_partition with real results * rebase main and fix cross join test * resolve conflicts * Feat: introduce partition statistics API * address comments * deprecated statistics API * rebase main and fix tests * fix
1 parent e44ae0a commit 324be53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1103
-199
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,11 @@ mod tests {
217217
assert_eq!(tt_batches, 50 /* 100/2 */);
218218

219219
// test metadata
220-
assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
221-
assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
220+
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
221+
assert_eq!(
222+
exec.partition_statistics(None)?.total_byte_size,
223+
Precision::Absent
224+
);
222225

223226
Ok(())
224227
}

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,11 @@ mod tests {
7575
assert_eq!(tt_batches, 6 /* 12/2 */);
7676

7777
// test metadata
78-
assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
79-
assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
78+
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
79+
assert_eq!(
80+
exec.partition_statistics(None)?.total_byte_size,
81+
Precision::Absent
82+
);
8083

8184
Ok(())
8285
}

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -616,9 +616,15 @@ mod tests {
616616
assert_eq!(tt_batches, 4 /* 8/2 */);
617617

618618
// test metadata
619-
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
619+
assert_eq!(
620+
exec.partition_statistics(None)?.num_rows,
621+
Precision::Exact(8)
622+
);
620623
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
621-
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
624+
assert_eq!(
625+
exec.partition_statistics(None)?.total_byte_size,
626+
Precision::Exact(671)
627+
);
622628

623629
Ok(())
624630
}
@@ -659,9 +665,15 @@ mod tests {
659665
get_exec(&state, "alltypes_plain.parquet", projection, Some(1)).await?;
660666

661667
// note: even if the limit is set, the executor rounds up to the batch size
662-
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
668+
assert_eq!(
669+
exec.partition_statistics(None)?.num_rows,
670+
Precision::Exact(8)
671+
);
663672
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
664-
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
673+
assert_eq!(
674+
exec.partition_statistics(None)?.total_byte_size,
675+
Precision::Exact(671)
676+
);
665677
let batches = collect(exec, task_ctx).await?;
666678
assert_eq!(1, batches.len());
667679
assert_eq!(11, batches[0].num_columns());

datafusion/core/src/datasource/listing/table.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,7 @@ impl ListingOptions {
739739
/// # Ok(())
740740
/// # }
741741
/// ```
742-
#[derive(Debug)]
742+
#[derive(Debug, Clone)]
743743
pub struct ListingTable {
744744
table_paths: Vec<ListingTableUrl>,
745745
/// `file_schema` contains only the columns physically stored in the data files themselves.
@@ -1149,23 +1149,25 @@ impl ListingTable {
11491149
let (file_group, inexact_stats) =
11501150
get_files_with_limit(files, limit, self.options.collect_stat).await?;
11511151

1152-
let mut file_groups = file_group.split_files(self.options.target_partitions);
1152+
let file_groups = file_group.split_files(self.options.target_partitions);
1153+
let (mut file_groups, mut stats) = compute_all_files_statistics(
1154+
file_groups,
1155+
self.schema(),
1156+
self.options.collect_stat,
1157+
inexact_stats,
1158+
)?;
11531159
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema())
11541160
.map_schema(self.file_schema.as_ref())?;
1155-
// Use schema_mapper to map each file-level column statistics to table-level column statistics
1161+
stats.column_statistics =
1162+
schema_mapper.map_column_statistics(&stats.column_statistics)?;
11561163
file_groups.iter_mut().try_for_each(|file_group| {
11571164
if let Some(stat) = file_group.statistics_mut() {
11581165
stat.column_statistics =
11591166
schema_mapper.map_column_statistics(&stat.column_statistics)?;
11601167
}
11611168
Ok::<_, DataFusionError>(())
11621169
})?;
1163-
compute_all_files_statistics(
1164-
file_groups,
1165-
self.schema(),
1166-
self.options.collect_stat,
1167-
inexact_stats,
1168-
)
1170+
Ok((file_groups, stats))
11691171
}
11701172

11711173
/// Collects statistics for a given partitioned file.
@@ -1324,8 +1326,14 @@ mod tests {
13241326
assert_eq!(exec.output_partitioning().partition_count(), 1);
13251327

13261328
// test metadata
1327-
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1328-
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1329+
assert_eq!(
1330+
exec.partition_statistics(None)?.num_rows,
1331+
Precision::Exact(8)
1332+
);
1333+
assert_eq!(
1334+
exec.partition_statistics(None)?.total_byte_size,
1335+
Precision::Exact(671)
1336+
);
13291337

13301338
Ok(())
13311339
}
@@ -1350,9 +1358,15 @@ mod tests {
13501358
let table = ListingTable::try_new(config)?;
13511359

13521360
let exec = table.scan(&state, None, &[], None).await?;
1353-
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
1361+
assert_eq!(
1362+
exec.partition_statistics(None)?.num_rows,
1363+
Precision::Exact(8)
1364+
);
13541365
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
1355-
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
1366+
assert_eq!(
1367+
exec.partition_statistics(None)?.total_byte_size,
1368+
Precision::Exact(671)
1369+
);
13561370

13571371
Ok(())
13581372
}
@@ -1378,8 +1392,11 @@ mod tests {
13781392
let table = ListingTable::try_new(config)?;
13791393

13801394
let exec = table.scan(&state, None, &[], None).await?;
1381-
assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
1382-
assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent);
1395+
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
1396+
assert_eq!(
1397+
exec.partition_statistics(None)?.total_byte_size,
1398+
Precision::Absent
1399+
);
13831400

13841401
Ok(())
13851402
}

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ impl ExecutionPlan for ArrowExec {
190190
fn statistics(&self) -> Result<Statistics> {
191191
self.inner.statistics()
192192
}
193+
194+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
195+
self.inner.partition_statistics(partition)
196+
}
197+
193198
fn fetch(&self) -> Option<usize> {
194199
self.inner.fetch()
195200
}

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,13 @@ impl ExecutionPlan for CustomExecutionPlan {
180180
}
181181

182182
fn statistics(&self) -> Result<Statistics> {
183+
self.partition_statistics(None)
184+
}
185+
186+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
187+
if partition.is_some() {
188+
return Ok(Statistics::new_unknown(&self.schema()));
189+
}
183190
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
184191
Ok(Statistics {
185192
num_rows: Precision::Exact(batch.num_rows()),

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,14 @@ impl ExecutionPlan for StatisticsValidation {
184184
fn statistics(&self) -> Result<Statistics> {
185185
Ok(self.stats.clone())
186186
}
187+
188+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
189+
if partition.is_some() {
190+
Ok(Statistics::new_unknown(&self.schema))
191+
} else {
192+
Ok(self.stats.clone())
193+
}
194+
}
187195
}
188196

189197
fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {
@@ -232,7 +240,7 @@ async fn sql_basic() -> Result<()> {
232240
let physical_plan = df.create_physical_plan().await.unwrap();
233241

234242
// the statistics should be those of the source
235-
assert_eq!(stats, physical_plan.statistics()?);
243+
assert_eq!(stats, physical_plan.partition_statistics(None)?);
236244

237245
Ok(())
238246
}
@@ -248,7 +256,7 @@ async fn sql_filter() -> Result<()> {
248256
.unwrap();
249257

250258
let physical_plan = df.create_physical_plan().await.unwrap();
251-
let stats = physical_plan.statistics()?;
259+
let stats = physical_plan.partition_statistics(None)?;
252260
assert_eq!(stats.num_rows, Precision::Inexact(1));
253261

254262
Ok(())
@@ -270,7 +278,7 @@ async fn sql_limit() -> Result<()> {
270278
column_statistics: col_stats,
271279
total_byte_size: Precision::Absent
272280
},
273-
physical_plan.statistics()?
281+
physical_plan.partition_statistics(None)?
274282
);
275283

276284
let df = ctx
@@ -279,7 +287,7 @@ async fn sql_limit() -> Result<()> {
279287
.unwrap();
280288
let physical_plan = df.create_physical_plan().await.unwrap();
281289
// when the limit is larger than the original number of lines, statistics remain unchanged
282-
assert_eq!(stats, physical_plan.statistics()?);
290+
assert_eq!(stats, physical_plan.partition_statistics(None)?);
283291

284292
Ok(())
285293
}
@@ -296,7 +304,7 @@ async fn sql_window() -> Result<()> {
296304

297305
let physical_plan = df.create_physical_plan().await.unwrap();
298306

299-
let result = physical_plan.statistics()?;
307+
let result = physical_plan.partition_statistics(None)?;
300308

301309
assert_eq!(stats.num_rows, result.num_rows);
302310
let col_stats = result.column_statistics;

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,19 @@ async fn check_stats_precision_with_filter_pushdown() {
5050
let (_, _, state) = get_cache_runtime_state();
5151
// Scan without filter, stats are exact
5252
let exec = table.scan(&state, None, &[], None).await.unwrap();
53-
assert_eq!(exec.statistics().unwrap().num_rows, Precision::Exact(8));
53+
assert_eq!(
54+
exec.partition_statistics(None).unwrap().num_rows,
55+
Precision::Exact(8)
56+
);
5457

5558
// Scan with filter pushdown, stats are inexact
5659
let filter = Expr::gt(col("id"), lit(1));
5760

5861
let exec = table.scan(&state, None, &[filter], None).await.unwrap();
59-
assert_eq!(exec.statistics().unwrap().num_rows, Precision::Inexact(8));
62+
assert_eq!(
63+
exec.partition_statistics(None).unwrap().num_rows,
64+
Precision::Inexact(8)
65+
);
6066
}
6167

6268
#[tokio::test]
@@ -79,9 +85,12 @@ async fn load_table_stats_with_session_level_cache() {
7985
assert_eq!(get_static_cache_size(&state1), 0);
8086
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
8187

82-
assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8));
8388
assert_eq!(
84-
exec1.statistics().unwrap().total_byte_size,
89+
exec1.partition_statistics(None).unwrap().num_rows,
90+
Precision::Exact(8)
91+
);
92+
assert_eq!(
93+
exec1.partition_statistics(None).unwrap().total_byte_size,
8594
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
8695
Precision::Exact(671),
8796
);
@@ -91,9 +100,12 @@ async fn load_table_stats_with_session_level_cache() {
91100
//check session 1 cache result not show in session 2
92101
assert_eq!(get_static_cache_size(&state2), 0);
93102
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
94-
assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8));
95103
assert_eq!(
96-
exec2.statistics().unwrap().total_byte_size,
104+
exec2.partition_statistics(None).unwrap().num_rows,
105+
Precision::Exact(8)
106+
);
107+
assert_eq!(
108+
exec2.partition_statistics(None).unwrap().total_byte_size,
97109
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
98110
Precision::Exact(671),
99111
);
@@ -103,9 +115,12 @@ async fn load_table_stats_with_session_level_cache() {
103115
//check session 1 cache result not show in session 2
104116
assert_eq!(get_static_cache_size(&state1), 1);
105117
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
106-
assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8));
107118
assert_eq!(
108-
exec3.statistics().unwrap().total_byte_size,
119+
exec3.partition_statistics(None).unwrap().num_rows,
120+
Precision::Exact(8)
121+
);
122+
assert_eq!(
123+
exec3.partition_statistics(None).unwrap().total_byte_size,
109124
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
110125
Precision::Exact(671),
111126
);

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl ExecutionPlan for SortRequiredExec {
170170
}
171171

172172
fn statistics(&self) -> Result<Statistics> {
173-
self.input.statistics()
173+
self.input.partition_statistics(None)
174174
}
175175
}
176176

0 commit comments

Comments
 (0)