Skip to content

Commit 99c811a

Browse files
2010YOUY01alamb
andauthored
Fix: External sort failing on StringView due to shared buffers (#14823)
* organize stringview arrays before spilling * add unit test * fix * add comment * review * Update datafusion/physical-plan/src/sorts/sort.rs Co-authored-by: Andrew Lamb <[email protected]> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent a49d543 commit 99c811a

File tree

2 files changed

+156
-44
lines changed
  • datafusion

2 files changed

+156
-44
lines changed

datafusion/core/tests/memory_limit/mod.rs

+67-2
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ use std::sync::{Arc, LazyLock};
2323

2424
#[cfg(feature = "extended_tests")]
2525
mod memory_limit_validation;
26-
use arrow::array::{ArrayRef, DictionaryArray, RecordBatch};
26+
use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray};
2727
use arrow::compute::SortOptions;
2828
use arrow::datatypes::{Int32Type, SchemaRef};
29+
use arrow_schema::{DataType, Field, Schema};
2930
use datafusion::assert_batches_eq;
3031
use datafusion::datasource::memory::MemorySourceConfig;
3132
use datafusion::datasource::source::DataSourceExec;
@@ -41,14 +42,15 @@ use datafusion_catalog::streaming::StreamingTable;
4142
use datafusion_catalog::Session;
4243
use datafusion_common::{assert_contains, Result};
4344
use datafusion_execution::memory_pool::{
44-
GreedyMemoryPool, MemoryPool, TrackConsumersPool,
45+
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
4546
};
4647
use datafusion_execution::TaskContext;
4748
use datafusion_expr::{Expr, TableType};
4849
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
4950
use datafusion_physical_optimizer::join_selection::JoinSelection;
5051
use datafusion_physical_optimizer::PhysicalOptimizerRule;
5152
use datafusion_physical_plan::spill::get_record_batch_memory_size;
53+
use rand::Rng;
5254
use test_utils::AccessLogGenerator;
5355

5456
use async_trait::async_trait;
@@ -403,6 +405,69 @@ async fn oom_with_tracked_consumer_pool() {
403405
.await
404406
}
405407

408+
/// For regression case: if spilled `StringViewArray`'s buffer will be referenced by
409+
/// other batches which are also need to be spilled, then the spill writer will
410+
/// repeatedly write out the same buffer, and after reading back, each batch's size
411+
/// will explode.
412+
///
413+
/// This test setup will cause 10 spills, each spill will sort around 20 batches.
414+
/// If there is memory explosion for spilled record batch, this test will fail.
415+
#[tokio::test]
416+
async fn test_stringview_external_sort() {
417+
let mut rng = rand::thread_rng();
418+
let array_length = 1000;
419+
let num_batches = 200;
420+
// Batches contain two columns: random 100-byte string, and random i32
421+
let mut batches = Vec::with_capacity(num_batches);
422+
423+
for _ in 0..num_batches {
424+
let strings: Vec<String> = (0..array_length)
425+
.map(|_| {
426+
(0..100)
427+
.map(|_| rng.gen_range(0..=u8::MAX) as char)
428+
.collect()
429+
})
430+
.collect();
431+
432+
let string_array = StringViewArray::from(strings);
433+
let array_ref: ArrayRef = Arc::new(string_array);
434+
435+
let random_numbers: Vec<i32> =
436+
(0..array_length).map(|_| rng.gen_range(0..=1000)).collect();
437+
let int_array = Int32Array::from(random_numbers);
438+
let int_array_ref: ArrayRef = Arc::new(int_array);
439+
440+
let batch = RecordBatch::try_new(
441+
Arc::new(Schema::new(vec![
442+
Field::new("strings", DataType::Utf8View, false),
443+
Field::new("random_numbers", DataType::Int32, false),
444+
])),
445+
vec![array_ref, int_array_ref],
446+
)
447+
.unwrap();
448+
batches.push(batch);
449+
}
450+
451+
// Run a sql query that sorts the batches by the int column
452+
let schema = batches[0].schema();
453+
let table = MemTable::try_new(schema, vec![batches]).unwrap();
454+
let builder = RuntimeEnvBuilder::new()
455+
.with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024)));
456+
let runtime = builder.build_arc().unwrap();
457+
458+
let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 1024 * 1024);
459+
460+
let ctx = SessionContext::new_with_config_rt(config, runtime);
461+
ctx.register_table("t", Arc::new(table)).unwrap();
462+
463+
let df = ctx
464+
.sql("explain analyze SELECT * FROM t ORDER BY random_numbers")
465+
.await
466+
.unwrap();
467+
468+
let _ = df.collect().await.expect("Query execution failed");
469+
}
470+
406471
/// Run the query with the specified memory limit,
407472
/// and verifies the expected errors are returned
408473
#[derive(Clone, Debug)]

datafusion/physical-plan/src/sorts/sort.rs

+89-42
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::fmt;
2424
use std::fmt::{Debug, Formatter};
2525
use std::sync::Arc;
2626

27-
use crate::common::{spawn_buffered, IPCWriter};
27+
use crate::common::spawn_buffered;
2828
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2929
use crate::expressions::PhysicalSortExpr;
3030
use crate::limit::LimitStream;
@@ -44,7 +44,9 @@ use crate::{
4444
Statistics,
4545
};
4646

47-
use arrow::array::{Array, RecordBatch, RecordBatchOptions, UInt32Array};
47+
use arrow::array::{
48+
Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
49+
};
4850
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
4951
use arrow::datatypes::{DataType, SchemaRef};
5052
use arrow::row::{RowConverter, SortField};
@@ -300,6 +302,7 @@ impl ExternalSorter {
300302
if input.num_rows() == 0 {
301303
return Ok(());
302304
}
305+
303306
self.reserve_memory_for_merge()?;
304307

305308
let size = get_reserved_byte_for_record_batch(&input);
@@ -397,6 +400,8 @@ impl ExternalSorter {
397400
return Ok(0);
398401
}
399402

403+
self.organize_stringview_arrays()?;
404+
400405
debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
401406

402407
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
@@ -414,6 +419,69 @@ impl ExternalSorter {
414419
Ok(used)
415420
}
416421

422+
/// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
423+
/// `StringViewArray` in sequential order by calling `gc()` on them.
424+
///
425+
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
426+
/// available
427+
///
428+
/// # Rationale
429+
/// After (merge-based) sorting, all batches will be sorted into a single run,
430+
/// but physically this sorted run is chunked into many small batches. For
431+
/// `StringViewArray`s inside each sorted run, their inner buffers are not
432+
/// re-constructed by default, leading to non-sequential payload locations
433+
/// (permutated by `interleave()` Arrow kernel). A single payload buffer might
434+
/// be shared by multiple `RecordBatch`es.
435+
/// When writing each batch to disk, the writer has to write all referenced buffers,
436+
/// because they have to be read back one by one to reduce memory usage. This
437+
/// causes extra disk reads and writes, and potentially execution failure.
438+
///
439+
/// # Example
440+
/// Before sorting:
441+
/// batch1 -> buffer1
442+
/// batch2 -> buffer2
443+
///
444+
/// sorted_batch1 -> buffer1
445+
/// -> buffer2
446+
/// sorted_batch2 -> buffer1
447+
/// -> buffer2
448+
///
449+
/// Then when spilling each batch, the writer has to write all referenced buffers
450+
/// repeatedly.
451+
fn organize_stringview_arrays(&mut self) -> Result<()> {
452+
let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len());
453+
454+
for batch in self.in_mem_batches.drain(..) {
455+
let mut new_columns: Vec<Arc<dyn Array>> =
456+
Vec::with_capacity(batch.num_columns());
457+
458+
let mut arr_mutated = false;
459+
for array in batch.columns() {
460+
if let Some(string_view_array) =
461+
array.as_any().downcast_ref::<StringViewArray>()
462+
{
463+
let new_array = string_view_array.gc();
464+
new_columns.push(Arc::new(new_array));
465+
arr_mutated = true;
466+
} else {
467+
new_columns.push(Arc::clone(array));
468+
}
469+
}
470+
471+
let organized_batch = if arr_mutated {
472+
RecordBatch::try_new(batch.schema(), new_columns)?
473+
} else {
474+
batch
475+
};
476+
477+
organized_batches.push(organized_batch);
478+
}
479+
480+
self.in_mem_batches = organized_batches;
481+
482+
Ok(())
483+
}
484+
417485
/// Sorts the in_mem_batches in place
418486
///
419487
/// Sorting may have freed memory, especially if fetch is `Some`. If
@@ -439,54 +507,29 @@ impl ExternalSorter {
439507
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
440508
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
441509
// write sorted batches to disk when the memory is insufficient.
442-
let mut spill_writer: Option<IPCWriter> = None;
443510
while let Some(batch) = sorted_stream.next().await {
444511
let batch = batch?;
445-
match &mut spill_writer {
446-
None => {
447-
let sorted_size = get_reserved_byte_for_record_batch(&batch);
448-
if self.reservation.try_grow(sorted_size).is_err() {
449-
// Directly write in_mem_batches as well as all the remaining batches in
450-
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
451-
// be handled by the `Some(writer)` matching arm.
452-
let spill_file =
453-
self.runtime.disk_manager.create_tmp_file("Sorting")?;
454-
let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?;
455-
// Flush everything in memory to the spill file
456-
for batch in self.in_mem_batches.drain(..) {
457-
writer.write(&batch)?;
458-
}
459-
// as well as the newly sorted batch
460-
writer.write(&batch)?;
461-
spill_writer = Some(writer);
462-
self.reservation.free();
463-
self.spills.push(spill_file);
464-
} else {
465-
self.in_mem_batches.push(batch);
466-
self.in_mem_batches_sorted = true;
467-
}
468-
}
469-
Some(writer) => {
470-
writer.write(&batch)?;
471-
}
512+
let sorted_size = get_reserved_byte_for_record_batch(&batch);
513+
if self.reservation.try_grow(sorted_size).is_err() {
514+
// Although the reservation is not enough, the batch is
515+
// already in memory, so it's okay to combine it with previously
516+
// sorted batches, and spill together.
517+
self.in_mem_batches.push(batch);
518+
self.spill().await?; // reservation is freed in spill()
519+
} else {
520+
self.in_mem_batches.push(batch);
521+
self.in_mem_batches_sorted = true;
472522
}
473523
}
474524

475525
// Drop early to free up memory reserved by the sorted stream, otherwise the
476526
// upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory.
477527
drop(sorted_stream);
478528

479-
if let Some(writer) = &mut spill_writer {
480-
writer.finish()?;
481-
self.metrics.spill_count.add(1);
482-
self.metrics.spilled_rows.add(writer.num_rows);
483-
self.metrics.spilled_bytes.add(writer.num_bytes);
484-
}
485-
486529
// Sorting may free up some memory especially when fetch is `Some`. If we have
487530
// not freed more than 50% of the memory, then we have to spill to free up more
488531
// memory for inserting more batches.
489-
if spill_writer.is_none() && self.reservation.size() > before / 2 {
532+
if self.reservation.size() > before / 2 {
490533
// We have not freed more than 50% of the memory, so we have to spill to
491534
// free up more memory
492535
self.spill().await?;
@@ -1422,10 +1465,14 @@ mod tests {
14221465
let spill_count = metrics.spill_count().unwrap();
14231466
let spilled_rows = metrics.spilled_rows().unwrap();
14241467
let spilled_bytes = metrics.spilled_bytes().unwrap();
1425-
// Processing 840 KB of data using 400 KB of memory requires at least 2 spills
1426-
// It will spill roughly 18000 rows and 800 KBytes.
1427-
// We leave a little wiggle room for the actual numbers.
1428-
assert!((2..=10).contains(&spill_count));
1468+
1469+
// This test case is processing 840KB of data using 400KB of memory. Note
1470+
// that buffered batches can't be dropped until all sorted batches are
1471+
// generated, so we can only buffer `sort_spill_reservation_bytes` of sorted
1472+
// batches.
1473+
// The number of spills is roughly calculated as:
1474+
// `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1475+
assert!((12..=18).contains(&spill_count));
14291476
assert!((15000..=20000).contains(&spilled_rows));
14301477
assert!((700000..=900000).contains(&spilled_bytes));
14311478

0 commit comments

Comments
 (0)