Skip to content

Commit c138b24

Browse files
committed
initial attempt at implementation
1 parent b0b6e44 commit c138b24

File tree

6 files changed

+149
-90
lines changed

6 files changed

+149
-90
lines changed

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::{
2323
sync::Arc, vec,
2424
};
2525

26-
use super::{get_projected_output_ordering, statistics::MinMaxStatistics};
26+
use super::{get_projected_output_ordering, min_max_statistics_from_files};
2727
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
2828
use crate::{error::Result, scalar::ScalarValue};
2929

@@ -310,22 +310,12 @@ impl FileScanConfig {
310310
sort_order: &LexOrdering,
311311
) -> Result<Vec<Vec<PartitionedFile>>> {
312312
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
313-
// First Fit:
314-
// * Choose the first file group that a file can be placed into.
315-
// * If it fits into no existing file groups, create a new one.
316-
//
317-
// By sorting files by min values and then applying first-fit bin packing,
318-
// we can produce the smallest number of file groups such that
319-
// files within a group are in order and non-overlapping.
320-
//
321-
// Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
322-
// https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
323313

324314
if flattened_files.is_empty() {
325315
return Ok(vec![]);
326316
}
327317

328-
let statistics = MinMaxStatistics::new_from_files(
318+
let statistics = min_max_statistics_from_files(
329319
sort_order,
330320
table_schema,
331321
None,
@@ -335,24 +325,7 @@ impl FileScanConfig {
335325
e.context("construct min/max statistics for split_groups_by_statistics")
336326
})?;
337327

338-
let indices_sorted_by_min = statistics.min_values_sorted();
339-
let mut file_groups_indices: Vec<Vec<usize>> = vec![];
340-
341-
for (idx, min) in indices_sorted_by_min {
342-
let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
343-
// If our file is non-overlapping and comes _after_ the last file,
344-
// it fits in this file group.
345-
min > statistics.max(
346-
*group
347-
.last()
348-
.expect("groups should be nonempty at construction"),
349-
)
350-
});
351-
match file_group_to_insert {
352-
Some(group) => group.push(idx),
353-
None => file_groups_indices.push(vec![idx]),
354-
}
355-
}
328+
let file_groups_indices = statistics.first_fit();
356329

357330
// Assemble indices back into groups of PartitionedFiles
358331
Ok(file_groups_indices

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ mod file_stream;
2626
mod json;
2727
#[cfg(feature = "parquet")]
2828
pub mod parquet;
29-
mod statistics;
3029

3130
pub(crate) use self::csv::plan_to_csv;
3231
pub(crate) use self::json::plan_to_json;
@@ -36,7 +35,9 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor
3635
pub use arrow_file::ArrowExec;
3736
pub use avro::AvroExec;
3837
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
38+
use datafusion_common::{stats::Precision, ColumnStatistics, DataFusionError};
3939
use datafusion_expr::dml::InsertOp;
40+
use datafusion_physical_plan::statistics::MinMaxStatistics;
4041
pub use file_groups::FileGroupPartitioner;
4142
pub use file_scan_config::{
4243
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
@@ -366,10 +367,10 @@ fn get_projected_output_ordering(
366367
return false;
367368
}
368369

369-
let statistics = match statistics::MinMaxStatistics::new_from_files(
370+
let statistics = match min_max_statistics_from_files(
370371
&new_ordering,
371372
projected_schema,
372-
base_config.projection.as_deref(),
373+
base_config.projection.as_ref(),
373374
group,
374375
) {
375376
Ok(statistics) => statistics,
@@ -395,6 +396,40 @@ fn get_projected_output_ordering(
395396
all_orderings
396397
}
397398

399+
/// Construct MinMaxStatistics from a list of files
400+
fn min_max_statistics_from_files<'a>(
401+
projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
402+
projected_schema: &SchemaRef, // Projected schema
403+
projection: Option<&Vec<usize>>, // Indices of projection in full table schema (None = all columns)
404+
files: impl IntoIterator<Item = &'a PartitionedFile>,
405+
) -> Result<MinMaxStatistics> {
406+
let projected_statistics = files
407+
.into_iter()
408+
.map(|file| {
409+
let mut statistics = file.statistics.clone()?;
410+
for partition in &file.partition_values {
411+
statistics.column_statistics.push(ColumnStatistics {
412+
null_count: Precision::Exact(0),
413+
max_value: Precision::Exact(partition.clone()),
414+
min_value: Precision::Exact(partition.clone()),
415+
distinct_count: Precision::Exact(1),
416+
});
417+
}
418+
419+
Some(statistics.project(projection))
420+
})
421+
.collect::<Option<Vec<_>>>()
422+
.ok_or_else(|| {
423+
DataFusionError::Plan("Parquet file missing statistics".to_string())
424+
})?;
425+
426+
MinMaxStatistics::new_from_statistics(
427+
projected_sort_order,
428+
projected_schema,
429+
&projected_statistics,
430+
)
431+
}
432+
398433
/// Represents the possible outcomes of a range calculation.
399434
///
400435
/// This enum is used to encapsulate the result of calculating the range of

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
397397
Ok(Statistics::new_unknown(&self.schema()))
398398
}
399399

400+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
401+
Ok(vec![
402+
self.statistics()?;
403+
self.properties().partitioning.partition_count()
404+
])
405+
}
406+
400407
/// Returns `true` if a limit can be safely pushed down through this
401408
/// `ExecutionPlan` node.
402409
///

datafusion/physical-plan/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub mod recursive_query;
7272
pub mod repartition;
7373
pub mod sorts;
7474
pub mod spill;
75+
pub mod statistics;
7576
pub mod stream;
7677
pub mod streaming;
7778
pub mod tree_node;

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use crate::common::spawn_buffered;
2424
use crate::limit::LimitStream;
2525
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2626
use crate::sorts::streaming_merge::StreamingMergeBuilder;
27+
use crate::statistics::MinMaxStatistics;
28+
use crate::stream::RecordBatchStreamAdapter;
2729
use crate::{
2830
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
2931
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
@@ -34,6 +36,7 @@ use datafusion_execution::memory_pool::MemoryConsumer;
3436
use datafusion_execution::TaskContext;
3537

3638
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
39+
use futures::StreamExt;
3740
use log::{debug, trace};
3841

3942
/// Sort preserving merge execution plan
@@ -249,16 +252,42 @@ impl ExecutionPlan for SortPreservingMergeExec {
249252
MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]"))
250253
.register(&context.runtime_env().memory_pool);
251254

252-
match input_partitions {
255+
let statistics = MinMaxStatistics::new_from_statistics(
256+
&self.expr,
257+
&self.schema(),
258+
&self.input.statistics_by_partition()?,
259+
)?;
260+
261+
// Organize the input partitions into chains,
262+
// where elements of each chain are input partitions that are
263+
// non-overlapping, and each chain is ordered by their min/max statistics.
264+
// Then concatenate each chain into a single stream.
265+
let mut streams = statistics
266+
.first_fit()
267+
.into_iter()
268+
.map(|chain| {
269+
let streams = chain
270+
.into_iter()
271+
.map(|i| self.input.execute(i, Arc::clone(&context)))
272+
.collect::<Result<Vec<_>>>()?;
273+
274+
// Concatenate the chain into a single stream
275+
Ok(Box::pin(RecordBatchStreamAdapter::new(
276+
self.input.schema(),
277+
futures::stream::iter(streams).flatten(),
278+
)) as SendableRecordBatchStream)
279+
})
280+
.collect::<Result<Vec<_>>>()?;
281+
282+
match streams.len() {
253283
0 => internal_err!(
254284
"SortPreservingMergeExec requires at least one input partition"
255285
),
256286
1 => match self.fetch {
257287
Some(fetch) => {
258-
let stream = self.input.execute(0, context)?;
259288
debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}");
260289
Ok(Box::pin(LimitStream::new(
261-
stream,
290+
streams.remove(0),
262291
0,
263292
Some(fetch),
264293
BaselineMetrics::new(&self.metrics, partition),
@@ -271,12 +300,9 @@ impl ExecutionPlan for SortPreservingMergeExec {
271300
}
272301
},
273302
_ => {
274-
let receivers = (0..input_partitions)
275-
.map(|partition| {
276-
let stream =
277-
self.input.execute(partition, Arc::clone(&context))?;
278-
Ok(spawn_buffered(stream, 1))
279-
})
303+
let receivers = streams
304+
.into_iter()
305+
.map(|stream| Ok(spawn_buffered(stream, 1)))
280306
.collect::<Result<_>>()?;
281307

282308
debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");

0 commit comments

Comments
 (0)