Skip to content

Commit 26c5050

Browse files
committed
init
1 parent 4ac9b55 commit 26c5050

File tree

5 files changed

+152
-6
lines changed

5 files changed

+152
-6
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::execution::context::SessionState;
3333
use datafusion_catalog::TableProvider;
3434
use datafusion_common::{config_err, DataFusionError, Result};
3535
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
36+
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
3637
use datafusion_expr::dml::InsertOp;
3738
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
3839
use datafusion_expr::{SortExpr, TableType};
@@ -1129,7 +1130,17 @@ impl ListingTable {
11291130
let (file_group, inexact_stats) =
11301131
get_files_with_limit(files, limit, self.options.collect_stat).await?;
11311132

1132-
let file_groups = file_group.split_files(self.options.target_partitions);
1133+
let mut file_groups = file_group.split_files(self.options.target_partitions);
1134+
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema())
1135+
.map_schema(self.file_schema.as_ref())?;
1136+
// Use schema_mapper to map each file-level column statistics to table-level column statistics
1137+
file_groups.iter_mut().try_for_each(|file_group| {
1138+
if let Some(stat) = file_group.statistics_mut() {
1139+
stat.column_statistics =
1140+
schema_mapper.map_column_statistics(&stat.column_statistics)?;
1141+
}
1142+
Ok::<_, DataFusionError>(())
1143+
})?;
11331144
compute_all_files_statistics(
11341145
file_groups,
11351146
self.schema(),

datafusion/core/src/datasource/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,5 +264,12 @@ mod tests {
264264

265265
Ok(RecordBatch::try_new(schema, new_columns).unwrap())
266266
}
267+
268+
fn map_column_statistics(
269+
&self,
270+
_file_col_statistics: &[datafusion_common::ColumnStatistics],
271+
) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
272+
todo!()
273+
}
267274
}
268275
}

datafusion/datasource/src/file_groups.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,11 @@ impl FileGroup {
425425
self.statistics.as_deref()
426426
}
427427

428+
/// Get the mutable reference to the statistics for this group
429+
pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
430+
self.statistics.as_mut().map(|arc| Arc::make_mut(arc))
431+
}
432+
428433
/// Partition the list of files into `n` groups
429434
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
430435
if self.is_empty() {

datafusion/datasource/src/schema_adapter.rs

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
2525
use arrow::compute::{can_cast_types, cast};
2626
use arrow::datatypes::{Schema, SchemaRef};
27-
use datafusion_common::plan_err;
27+
use datafusion_common::{plan_err, ColumnStatistics};
2828
use std::fmt::Debug;
2929
use std::sync::Arc;
3030

@@ -96,6 +96,12 @@ pub trait SchemaAdapter: Send + Sync {
9696
pub trait SchemaMapper: Debug + Send + Sync {
9797
/// Adapts a `RecordBatch` to match the `table_schema`
9898
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
99+
100+
/// Adapts file-level column `Statistics` to match the `table_schema`
101+
fn map_column_statistics(
102+
&self,
103+
file_col_statistics: &[ColumnStatistics],
104+
) -> datafusion_common::Result<Vec<ColumnStatistics>>;
99105
}
100106

101107
/// Default [`SchemaAdapterFactory`] for mapping schemas.
@@ -334,4 +340,122 @@ impl SchemaMapper for SchemaMapping {
334340
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
335341
Ok(record_batch)
336342
}
343+
344+
/// Adapts file-level column `Statistics` to match the `table_schema`
345+
fn map_column_statistics(
346+
&self,
347+
file_col_statistics: &[ColumnStatistics],
348+
) -> datafusion_common::Result<Vec<ColumnStatistics>> {
349+
let mut table_col_statistics = vec![];
350+
351+
// Map the statistics for each field in the file schema to the corresponding field in the
352+
// table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
353+
for (_, file_col_idx) in self
354+
.projected_table_schema
355+
.fields()
356+
.iter()
357+
.zip(&self.field_mappings)
358+
{
359+
if let Some(file_col_idx) = file_col_idx {
360+
table_col_statistics.push(
361+
file_col_statistics
362+
.get(*file_col_idx)
363+
.cloned()
364+
.unwrap_or_default(),
365+
);
366+
} else {
367+
table_col_statistics.push(ColumnStatistics::new_unknown());
368+
}
369+
}
370+
371+
Ok(table_col_statistics)
372+
}
373+
}
374+
375+
#[cfg(test)]
376+
mod tests {
377+
use arrow::datatypes::{DataType, Field};
378+
use datafusion_common::{stats::Precision, Statistics};
379+
380+
use super::*;
381+
382+
#[test]
383+
fn test_schema_mapping_map_statistics_basic() {
384+
// Create table schema (a, b, c)
385+
let table_schema = Arc::new(Schema::new(vec![
386+
Field::new("a", DataType::Int32, true),
387+
Field::new("b", DataType::Utf8, true),
388+
Field::new("c", DataType::Float64, true),
389+
]));
390+
391+
// Create file schema (b, a) - different order, missing c
392+
let file_schema = Schema::new(vec![
393+
Field::new("b", DataType::Utf8, true),
394+
Field::new("a", DataType::Int32, true),
395+
]);
396+
397+
// Create SchemaAdapter
398+
let adapter = DefaultSchemaAdapter {
399+
projected_table_schema: Arc::clone(&table_schema),
400+
};
401+
402+
// Get mapper and projection
403+
let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
404+
405+
// Should project columns 0,1 from file
406+
assert_eq!(projection, vec![0, 1]);
407+
408+
// Create file statistics
409+
let mut file_stats = Statistics::default();
410+
411+
// Statistics for column b (index 0 in file)
412+
let mut b_stats = ColumnStatistics::default();
413+
b_stats.null_count = Precision::Exact(5);
414+
415+
// Statistics for column a (index 1 in file)
416+
let mut a_stats = ColumnStatistics::default();
417+
a_stats.null_count = Precision::Exact(10);
418+
419+
file_stats.column_statistics = vec![b_stats, a_stats];
420+
421+
// Map statistics
422+
let table_col_stats = mapper
423+
.map_column_statistics(&file_stats.column_statistics)
424+
.unwrap();
425+
426+
// Verify stats
427+
assert_eq!(table_col_stats.len(), 3);
428+
assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
429+
assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
430+
assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
431+
}
432+
433+
#[test]
434+
fn test_schema_mapping_map_statistics_empty() {
435+
// Create schemas
436+
let table_schema = Arc::new(Schema::new(vec![
437+
Field::new("a", DataType::Int32, true),
438+
Field::new("b", DataType::Utf8, true),
439+
]));
440+
let file_schema = Schema::new(vec![
441+
Field::new("a", DataType::Int32, true),
442+
Field::new("b", DataType::Utf8, true),
443+
]);
444+
445+
let adapter = DefaultSchemaAdapter {
446+
projected_table_schema: Arc::clone(&table_schema),
447+
};
448+
let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
449+
450+
// Empty file statistics
451+
let file_stats = Statistics::default();
452+
let table_col_stats = mapper
453+
.map_column_statistics(&file_stats.column_statistics)
454+
.unwrap();
455+
456+
// All stats should be unknown
457+
assert_eq!(table_col_stats.len(), 2);
458+
assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
459+
assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
460+
}
337461
}

datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::EmissionType;
3434
use datafusion_physical_plan::repartition::RepartitionExec;
3535
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3636
use datafusion_physical_plan::tree_node::PlanContext;
37-
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
37+
use datafusion_physical_plan::ExecutionPlanProperties;
3838

3939
use itertools::izip;
4040

@@ -205,9 +205,8 @@ pub fn plan_with_order_breaking_variants(
205205
// Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`
206206
// SPM may have `fetch`, so pass it to the `CoalescePartitionsExec`
207207
let child = Arc::clone(&sort_input.children[0].plan);
208-
let coalesce = CoalescePartitionsExec::new(child)
209-
.with_fetch(plan.fetch())
210-
.unwrap();
208+
let coalesce =
209+
Arc::new(CoalescePartitionsExec::new(child).with_fetch(plan.fetch()));
211210
sort_input.plan = coalesce;
212211
} else {
213212
return sort_input.update_plan_from_children();

0 commit comments

Comments
 (0)