Skip to content

Commit 1171574

Browse files
authored
fix: Assertion fail in external sort (#15469)
* fix assertion fail in external sort by refactoring * clippy * avoid assert
1 parent dc8d119 commit 1171574

File tree

2 files changed

+89
-39
lines changed
  • datafusion

2 files changed

+89
-39
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use datafusion_expr::{Expr, TableType};
4949
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
5050
use datafusion_physical_optimizer::join_selection::JoinSelection;
5151
use datafusion_physical_optimizer::PhysicalOptimizerRule;
52+
use datafusion_physical_plan::common::collect;
5253
use datafusion_physical_plan::spill::get_record_batch_memory_size;
5354
use rand::Rng;
5455
use test_utils::AccessLogGenerator;
@@ -493,6 +494,36 @@ async fn test_in_mem_buffer_almost_full() {
493494
let _ = df.collect().await.unwrap();
494495
}
495496

497+
/// External sort should be able to run if there is very little pre-reserved memory
498+
/// for merge (set configuration sort_spill_reservation_bytes to 0).
499+
#[tokio::test]
500+
async fn test_external_sort_zero_merge_reservation() {
501+
let config = SessionConfig::new()
502+
.with_sort_spill_reservation_bytes(0)
503+
.with_target_partitions(14);
504+
let runtime = RuntimeEnvBuilder::new()
505+
.with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024)))
506+
.build_arc()
507+
.unwrap();
508+
509+
let ctx = SessionContext::new_with_config_rt(config, runtime);
510+
511+
let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;";
512+
let df = ctx.sql(query).await.unwrap();
513+
514+
let physical_plan = df.create_physical_plan().await.unwrap();
515+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
516+
let stream = physical_plan.execute(0, task_ctx).unwrap();
517+
518+
// Ensures execution succeed
519+
let _result = collect(stream).await;
520+
521+
// Ensures the query spilled during execution
522+
let metrics = physical_plan.metrics().unwrap();
523+
let spill_count = metrics.spill_count().unwrap();
524+
assert!(spill_count > 0);
525+
}
526+
496527
/// Run the query with the specified memory limit,
497528
/// and verifies the expected errors are returned
498529
#[derive(Clone, Debug)]

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,8 @@ struct ExternalSorter {
220220
// STATE BUFFERS:
221221
// Fields that hold intermediate data during sorting
222222
// ========================================================================
223-
/// Potentially unsorted in memory buffer
223+
/// Unsorted input batches stored in the memory buffer
224224
in_mem_batches: Vec<RecordBatch>,
225-
/// if `Self::in_mem_batches` are sorted
226-
in_mem_batches_sorted: bool,
227225

228226
/// During external sorting, in-memory intermediate data will be appended to
229227
/// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
@@ -304,7 +302,6 @@ impl ExternalSorter {
304302
Ok(Self {
305303
schema,
306304
in_mem_batches: vec![],
307-
in_mem_batches_sorted: false,
308305
in_progress_spill_file: None,
309306
finished_spill_files: vec![],
310307
expr: expr.into(),
@@ -341,7 +338,6 @@ impl ExternalSorter {
341338
}
342339

343340
self.in_mem_batches.push(input);
344-
self.in_mem_batches_sorted = false;
345341
Ok(())
346342
}
347343

@@ -418,16 +414,13 @@ impl ExternalSorter {
418414
self.metrics.spill_metrics.spill_file_count.value()
419415
}
420416

421-
/// When calling, all `in_mem_batches` must be sorted (*), and then all of them will
422-
/// be appended to the in-progress spill file.
423-
///
424-
/// (*) 'Sorted' here means globally sorted for all buffered batches when the
425-
/// memory limit is reached, instead of partially sorted within the batch.
426-
async fn spill_append(&mut self) -> Result<()> {
427-
assert!(self.in_mem_batches_sorted);
428-
429-
// we could always get a chance to free some memory as long as we are holding some
430-
if self.in_mem_batches.is_empty() {
417+
/// Appending globally sorted batches to the in-progress spill file, and clears
418+
/// the `globally_sorted_batches` (also its memory reservation) afterwards.
419+
async fn consume_and_spill_append(
420+
&mut self,
421+
globally_sorted_batches: &mut Vec<RecordBatch>,
422+
) -> Result<()> {
423+
if globally_sorted_batches.is_empty() {
431424
return Ok(());
432425
}
433426

@@ -437,21 +430,25 @@ impl ExternalSorter {
437430
Some(self.spill_manager.create_in_progress_file("Sorting")?);
438431
}
439432

440-
self.organize_stringview_arrays()?;
433+
Self::organize_stringview_arrays(globally_sorted_batches)?;
441434

442435
debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
443436

444-
let batches = std::mem::take(&mut self.in_mem_batches);
437+
let batches_to_spill = std::mem::take(globally_sorted_batches);
445438
self.reservation.free();
446439

447440
let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| {
448441
internal_datafusion_err!("In-progress spill file should be initialized")
449442
})?;
450443

451-
for batch in batches {
444+
for batch in batches_to_spill {
452445
in_progress_file.append_batch(&batch)?;
453446
}
454447

448+
if !globally_sorted_batches.is_empty() {
449+
return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking.");
450+
}
451+
455452
Ok(())
456453
}
457454

@@ -470,7 +467,7 @@ impl ExternalSorter {
470467
Ok(())
471468
}
472469

473-
/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
470+
/// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
474471
/// `StringViewArray` in sequential order by calling `gc()` on them.
475472
///
476473
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
@@ -499,10 +496,12 @@ impl ExternalSorter {
499496
///
500497
/// Then when spilling each batch, the writer has to write all referenced buffers
501498
/// repeatedly.
502-
fn organize_stringview_arrays(&mut self) -> Result<()> {
503-
let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len());
499+
fn organize_stringview_arrays(
500+
globally_sorted_batches: &mut Vec<RecordBatch>,
501+
) -> Result<()> {
502+
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());
504503

505-
for batch in self.in_mem_batches.drain(..) {
504+
for batch in globally_sorted_batches.drain(..) {
506505
let mut new_columns: Vec<Arc<dyn Array>> =
507506
Vec::with_capacity(batch.num_columns());
508507

@@ -528,20 +527,17 @@ impl ExternalSorter {
528527
organized_batches.push(organized_batch);
529528
}
530529

531-
self.in_mem_batches = organized_batches;
530+
*globally_sorted_batches = organized_batches;
532531

533532
Ok(())
534533
}
535534

536-
/// Sorts the in_mem_batches in place
535+
/// Sorts the in_mem_batches and potentially spill the sorted batches.
537536
///
538-
/// Sorting may have freed memory, especially if fetch is `Some`. If
539-
/// the memory usage has dropped by a factor of 2, then we don't have
540-
/// to spill. Otherwise, we spill to free up memory for inserting
541-
/// more batches.
542-
/// The factor of 2 aims to avoid a degenerate case where the
543-
/// memory required for `fetch` is just under the memory available,
544-
/// causing repeated re-sorting of data
537+
/// If the memory usage has dropped by a factor of 2, it might be a sort with
538+
/// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the
539+
/// sorted entries inside `in_mem_batches` to be sorted in the next iteration.
540+
/// Otherwise, we spill the sorted run to free up memory for inserting more batches.
545541
///
546542
/// # Arguments
547543
///
@@ -560,10 +556,18 @@ impl ExternalSorter {
560556

561557
let mut sorted_stream =
562558
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
559+
// After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken
560+
// to construct a globally sorted stream.
561+
if !self.in_mem_batches.is_empty() {
562+
return internal_err!(
563+
"in_mem_batches should be empty after constructing sorted stream"
564+
);
565+
}
566+
// 'global' here refers to all buffered batches when the memory limit is
567+
// reached. This variable will buffer the sorted batches after
568+
// sort-preserving merge and incrementally append to spill files.
569+
let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
563570

564-
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
565-
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
566-
// write sorted batches to disk when the memory is insufficient.
567571
let mut spilled = false;
568572
while let Some(batch) = sorted_stream.next().await {
569573
let batch = batch?;
@@ -572,12 +576,12 @@ impl ExternalSorter {
572576
// Although the reservation is not enough, the batch is
573577
// already in memory, so it's okay to combine it with previously
574578
// sorted batches, and spill together.
575-
self.in_mem_batches.push(batch);
576-
self.spill_append().await?; // reservation is freed in spill()
579+
globally_sorted_batches.push(batch);
580+
self.consume_and_spill_append(&mut globally_sorted_batches)
581+
.await?; // reservation is freed in spill()
577582
spilled = true;
578583
} else {
579-
self.in_mem_batches.push(batch);
580-
self.in_mem_batches_sorted = true;
584+
globally_sorted_batches.push(batch);
581585
}
582586
}
583587

@@ -591,12 +595,27 @@ impl ExternalSorter {
591595
if (self.reservation.size() > before / 2) || force_spill {
592596
// We have not freed more than 50% of the memory, so we have to spill to
593597
// free up more memory
594-
self.spill_append().await?;
598+
self.consume_and_spill_append(&mut globally_sorted_batches)
599+
.await?;
595600
spilled = true;
596601
}
597602

598603
if spilled {
604+
// There might be some buffered batches that haven't trigger a spill yet.
605+
self.consume_and_spill_append(&mut globally_sorted_batches)
606+
.await?;
599607
self.spill_finish().await?;
608+
} else {
609+
// If the memory limit has reached before calling this function, and it
610+
// didn't spill anything, it means this is a sorting with fetch top K
611+
// element: after sorting only the top K elements will be kept in memory.
612+
// For simplicity, those sorted top K entries are put back to unsorted
613+
// `in_mem_batches` to be consumed by the next sort/merge.
614+
if !self.in_mem_batches.is_empty() {
615+
return internal_err!("in_mem_batches should be cleared before");
616+
}
617+
618+
self.in_mem_batches = std::mem::take(&mut globally_sorted_batches);
600619
}
601620

602621
// Reserve headroom for next sort/merge

0 commit comments

Comments
 (0)