Skip to content

Commit f5dcdf0

Browse files
Unbounded SortExec (and Top-K) Implementation When Req's Are Satisfied (#12174)
* Sort fetch updates execution mode * Update sort.rs * Update sort.rs * Update sort.rs * Update sort.rs * Update sort.rs * Apply suggestions from code review * Update sort.rs * Update datafusion/physical-plan/src/sorts/sort.rs * Update datafusion/physical-plan/src/sorts/sort.rs * Reuse LimitStream --------- Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent bd50698 commit f5dcdf0

File tree

3 files changed

+265
-89
lines changed

3 files changed

+265
-89
lines changed

datafusion/physical-plan/src/metrics/baseline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub struct BaselineMetrics {
5656
}
5757

5858
impl BaselineMetrics {
59-
/// Create a new BaselineMetric structure, and set `start_time` to now
59+
/// Create a new BaselineMetric structure, and set `start_time` to now
6060
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
6161
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
6262
start_time.record();

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 244 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::sync::Arc;
2626

2727
use crate::common::spawn_buffered;
2828
use crate::expressions::PhysicalSortExpr;
29+
use crate::limit::LimitStream;
2930
use crate::metrics::{
3031
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
3132
};
@@ -51,6 +52,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
5152
use datafusion_execution::runtime_env::RuntimeEnv;
5253
use datafusion_execution::TaskContext;
5354
use datafusion_physical_expr::LexOrdering;
55+
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;
5456

5557
use futures::{StreamExt, TryStreamExt};
5658
use log::{debug, trace};
@@ -737,9 +739,22 @@ impl SortExec {
737739
/// This can reduce the memory pressure required by the sort
738740
/// operation since rows that are not going to be included
739741
/// can be dropped.
740-
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
741-
self.fetch = fetch;
742-
self
742+
pub fn with_fetch(&self, fetch: Option<usize>) -> Self {
743+
let mut cache = self.cache.clone();
744+
if fetch.is_some() && self.cache.execution_mode == ExecutionMode::Unbounded {
745+
// When a theoretically unnecessary sort becomes a top-K (which
746+
// sometimes arises as an intermediate state before full removal),
747+
// its execution mode should become `Bounded`.
748+
cache.execution_mode = ExecutionMode::Bounded;
749+
}
750+
SortExec {
751+
input: Arc::clone(&self.input),
752+
expr: self.expr.clone(),
753+
metrics_set: self.metrics_set.clone(),
754+
preserve_partitioning: self.preserve_partitioning,
755+
fetch,
756+
cache,
757+
}
743758
}
744759

745760
/// Input schema
@@ -775,6 +790,16 @@ impl SortExec {
775790
sort_exprs: LexOrdering,
776791
preserve_partitioning: bool,
777792
) -> PlanProperties {
793+
// Determine execution mode:
794+
let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement(
795+
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(),
796+
);
797+
let mode = match input.execution_mode() {
798+
ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded,
799+
ExecutionMode::Bounded => ExecutionMode::Bounded,
800+
_ => ExecutionMode::PipelineBreaking,
801+
};
802+
778803
// Calculate equivalence properties; i.e. reset the ordering equivalence
779804
// class with the new ordering:
780805
let eq_properties = input
@@ -786,14 +811,6 @@ impl SortExec {
786811
let output_partitioning =
787812
Self::output_partitioning_helper(input, preserve_partitioning);
788813

789-
// Determine execution mode:
790-
let mode = match input.execution_mode() {
791-
ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
792-
ExecutionMode::PipelineBreaking
793-
}
794-
ExecutionMode::Bounded => ExecutionMode::Bounded,
795-
};
796-
797814
PlanProperties::new(eq_properties, output_partitioning, mode)
798815
}
799816
}
@@ -874,53 +891,68 @@ impl ExecutionPlan for SortExec {
874891

875892
trace!("End SortExec's input.execute for partition: {}", partition);
876893

877-
if let Some(fetch) = self.fetch.as_ref() {
878-
let mut topk = TopK::try_new(
879-
partition,
880-
input.schema(),
881-
self.expr.clone(),
882-
*fetch,
883-
context.session_config().batch_size(),
884-
context.runtime_env(),
885-
&self.metrics_set,
886-
partition,
887-
)?;
888-
889-
Ok(Box::pin(RecordBatchStreamAdapter::new(
890-
self.schema(),
891-
futures::stream::once(async move {
892-
while let Some(batch) = input.next().await {
893-
let batch = batch?;
894-
topk.insert_batch(batch)?;
895-
}
896-
topk.emit()
897-
})
898-
.try_flatten(),
899-
)))
900-
} else {
901-
let mut sorter = ExternalSorter::new(
902-
partition,
903-
input.schema(),
904-
self.expr.clone(),
905-
context.session_config().batch_size(),
906-
self.fetch,
907-
execution_options.sort_spill_reservation_bytes,
908-
execution_options.sort_in_place_threshold_bytes,
909-
&self.metrics_set,
910-
context.runtime_env(),
894+
let sort_satisfied = self
895+
.input
896+
.equivalence_properties()
897+
.ordering_satisfy_requirement(
898+
PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(),
911899
);
912900

913-
Ok(Box::pin(RecordBatchStreamAdapter::new(
914-
self.schema(),
915-
futures::stream::once(async move {
916-
while let Some(batch) = input.next().await {
917-
let batch = batch?;
918-
sorter.insert_batch(batch).await?;
919-
}
920-
sorter.sort()
921-
})
922-
.try_flatten(),
923-
)))
901+
match (sort_satisfied, self.fetch.as_ref()) {
902+
(true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
903+
input,
904+
0,
905+
Some(*fetch),
906+
BaselineMetrics::new(&self.metrics_set, partition),
907+
))),
908+
(true, None) => Ok(input),
909+
(false, Some(fetch)) => {
910+
let mut topk = TopK::try_new(
911+
partition,
912+
input.schema(),
913+
self.expr.clone(),
914+
*fetch,
915+
context.session_config().batch_size(),
916+
context.runtime_env(),
917+
&self.metrics_set,
918+
partition,
919+
)?;
920+
Ok(Box::pin(RecordBatchStreamAdapter::new(
921+
self.schema(),
922+
futures::stream::once(async move {
923+
while let Some(batch) = input.next().await {
924+
let batch = batch?;
925+
topk.insert_batch(batch)?;
926+
}
927+
topk.emit()
928+
})
929+
.try_flatten(),
930+
)))
931+
}
932+
(false, None) => {
933+
let mut sorter = ExternalSorter::new(
934+
partition,
935+
input.schema(),
936+
self.expr.clone(),
937+
context.session_config().batch_size(),
938+
self.fetch,
939+
execution_options.sort_spill_reservation_bytes,
940+
execution_options.sort_in_place_threshold_bytes,
941+
&self.metrics_set,
942+
context.runtime_env(),
943+
);
944+
Ok(Box::pin(RecordBatchStreamAdapter::new(
945+
self.schema(),
946+
futures::stream::once(async move {
947+
while let Some(batch) = input.next().await {
948+
let batch = batch?;
949+
sorter.insert_batch(batch).await?;
950+
}
951+
sorter.sort()
952+
})
953+
.try_flatten(),
954+
)))
955+
}
924956
}
925957
}
926958

@@ -933,14 +965,7 @@ impl ExecutionPlan for SortExec {
933965
}
934966

935967
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
936-
Some(Arc::new(SortExec {
937-
input: Arc::clone(&self.input),
938-
expr: self.expr.clone(),
939-
metrics_set: self.metrics_set.clone(),
940-
preserve_partitioning: self.preserve_partitioning,
941-
fetch: limit,
942-
cache: self.cache.clone(),
943-
}))
968+
Some(Arc::new(SortExec::with_fetch(self, limit)))
944969
}
945970

946971
fn fetch(&self) -> Option<usize> {
@@ -951,6 +976,8 @@ impl ExecutionPlan for SortExec {
951976
#[cfg(test)]
952977
mod tests {
953978
use std::collections::HashMap;
979+
use std::pin::Pin;
980+
use std::task::{Context, Poll};
954981

955982
use super::*;
956983
use crate::coalesce_partitions::CoalescePartitionsExec;
@@ -965,12 +992,124 @@ mod tests {
965992
use arrow::compute::SortOptions;
966993
use arrow::datatypes::*;
967994
use datafusion_common::cast::as_primitive_array;
995+
use datafusion_common::{assert_batches_eq, Result, ScalarValue};
968996
use datafusion_execution::config::SessionConfig;
969997
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
998+
use datafusion_execution::RecordBatchStream;
999+
use datafusion_physical_expr::expressions::{Column, Literal};
1000+
use datafusion_physical_expr::EquivalenceProperties;
1001+
1002+
use futures::{FutureExt, Stream};
1003+
1004+
#[derive(Debug, Clone)]
1005+
pub struct SortedUnboundedExec {
1006+
schema: Schema,
1007+
batch_size: u64,
1008+
cache: PlanProperties,
1009+
}
1010+
1011+
impl DisplayAs for SortedUnboundedExec {
1012+
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
1013+
match t {
1014+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
1015+
write!(f, "UnboundableExec",).unwrap()
1016+
}
1017+
}
1018+
Ok(())
1019+
}
1020+
}
1021+
1022+
impl SortedUnboundedExec {
1023+
fn compute_properties(schema: SchemaRef) -> PlanProperties {
1024+
let mut eq_properties = EquivalenceProperties::new(schema);
1025+
eq_properties.add_new_orderings(vec![vec![PhysicalSortExpr::new(
1026+
Arc::new(Column::new("c1", 0)),
1027+
SortOptions::default(),
1028+
)]]);
1029+
let mode = ExecutionMode::Unbounded;
1030+
PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode)
1031+
}
1032+
}
1033+
1034+
impl ExecutionPlan for SortedUnboundedExec {
1035+
fn name(&self) -> &'static str {
1036+
Self::static_name()
1037+
}
1038+
1039+
fn as_any(&self) -> &dyn Any {
1040+
self
1041+
}
1042+
1043+
fn properties(&self) -> &PlanProperties {
1044+
&self.cache
1045+
}
1046+
1047+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1048+
vec![]
1049+
}
1050+
1051+
fn with_new_children(
1052+
self: Arc<Self>,
1053+
_: Vec<Arc<dyn ExecutionPlan>>,
1054+
) -> Result<Arc<dyn ExecutionPlan>> {
1055+
Ok(self)
1056+
}
1057+
1058+
fn execute(
1059+
&self,
1060+
_partition: usize,
1061+
_context: Arc<TaskContext>,
1062+
) -> Result<SendableRecordBatchStream> {
1063+
Ok(Box::pin(SortedUnboundedStream {
1064+
schema: Arc::new(self.schema.clone()),
1065+
batch_size: self.batch_size,
1066+
offset: 0,
1067+
}))
1068+
}
1069+
}
1070+
1071+
#[derive(Debug)]
1072+
pub struct SortedUnboundedStream {
1073+
schema: SchemaRef,
1074+
batch_size: u64,
1075+
offset: u64,
1076+
}
9701077

971-
use datafusion_common::ScalarValue;
972-
use datafusion_physical_expr::expressions::Literal;
973-
use futures::FutureExt;
1078+
impl Stream for SortedUnboundedStream {
1079+
type Item = Result<RecordBatch>;
1080+
1081+
fn poll_next(
1082+
mut self: Pin<&mut Self>,
1083+
_cx: &mut Context<'_>,
1084+
) -> Poll<Option<Self::Item>> {
1085+
let batch = SortedUnboundedStream::create_record_batch(
1086+
Arc::clone(&self.schema),
1087+
self.offset,
1088+
self.batch_size,
1089+
);
1090+
self.offset += self.batch_size;
1091+
Poll::Ready(Some(Ok(batch)))
1092+
}
1093+
}
1094+
1095+
impl RecordBatchStream for SortedUnboundedStream {
1096+
fn schema(&self) -> SchemaRef {
1097+
Arc::clone(&self.schema)
1098+
}
1099+
}
1100+
1101+
impl SortedUnboundedStream {
1102+
fn create_record_batch(
1103+
schema: SchemaRef,
1104+
offset: u64,
1105+
batch_size: u64,
1106+
) -> RecordBatch {
1107+
let values = (0..batch_size).map(|i| offset + i).collect::<Vec<_>>();
1108+
let array = UInt64Array::from(values);
1109+
let array_ref: ArrayRef = Arc::new(array);
1110+
RecordBatch::try_new(schema, vec![array_ref]).unwrap()
1111+
}
1112+
}
9741113

9751114
#[tokio::test]
9761115
async fn test_in_mem_sort() -> Result<()> {
@@ -1414,4 +1553,42 @@ mod tests {
14141553
let result = sort_batch(&batch, &expressions, None).unwrap();
14151554
assert_eq!(result.num_rows(), 1);
14161555
}
1556+
1557+
#[tokio::test]
1558+
async fn topk_unbounded_source() -> Result<()> {
1559+
let task_ctx = Arc::new(TaskContext::default());
1560+
let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
1561+
let source = SortedUnboundedExec {
1562+
schema: schema.clone(),
1563+
batch_size: 2,
1564+
cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())),
1565+
};
1566+
let mut plan = SortExec::new(
1567+
vec![PhysicalSortExpr::new(
1568+
Arc::new(Column::new("c1", 0)),
1569+
SortOptions::default(),
1570+
)],
1571+
Arc::new(source),
1572+
);
1573+
plan = plan.with_fetch(Some(9));
1574+
1575+
let batches = collect(Arc::new(plan), task_ctx).await?;
1576+
#[rustfmt::skip]
1577+
let expected = [
1578+
"+----+",
1579+
"| c1 |",
1580+
"+----+",
1581+
"| 0 |",
1582+
"| 1 |",
1583+
"| 2 |",
1584+
"| 3 |",
1585+
"| 4 |",
1586+
"| 5 |",
1587+
"| 6 |",
1588+
"| 7 |",
1589+
"| 8 |",
1590+
"+----+",];
1591+
assert_batches_eq!(expected, &batches);
1592+
Ok(())
1593+
}
14171594
}

0 commit comments

Comments
 (0)