Skip to content

Commit bd7fa4c

Browse files
committed
Address review feedback on spill view GC
1 parent edc19f9 commit bd7fa4c

File tree

2 files changed

+160
-53
lines changed

2 files changed

+160
-53
lines changed

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 146 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,18 @@ use std::sync::Arc;
3535
use std::task::{Context, Poll};
3636

3737
use arrow::array::{
38-
Array, BinaryViewArray, BufferSpec, GenericByteViewArray, StringViewArray, layout,
38+
Array, ArrayRef, BinaryViewArray, BufferSpec, GenericByteViewArray, StringViewArray,
39+
layout, make_array,
3940
};
41+
use arrow::datatypes::DataType;
4042
use arrow::datatypes::{ByteViewType, Schema, SchemaRef};
4143
use arrow::ipc::{
4244
MetadataVersion,
4345
reader::StreamReader,
4446
writer::{IpcWriteOptions, StreamWriter},
4547
};
4648
use arrow::record_batch::RecordBatch;
49+
use arrow_data::ArrayDataBuilder;
4750

4851
use datafusion_common::config::SpillCompression;
4952
use datafusion_common::{DataFusionError, Result, exec_datafusion_err};
@@ -380,63 +383,84 @@ const VIEW_SIZE_BYTES: usize = 16;
380383
///
381384
/// # Performance considerations
382385
///
383-
/// The function always returns a new RecordBatch for API consistency, but:
384-
/// - If no view arrays are present, it's a cheap clone (just Arc increments)
386+
/// - If no view arrays need compaction, the original batch is cloned cheaply
385387
/// - GC is skipped for small buffers to avoid unnecessary CPU overhead
388+
/// - Nested container types are traversed recursively so view arrays inside
389+
/// `List`, `Map`, `Union`, `Dictionary`, and other child-bearing arrays are compacted too
386390
/// - The Arrow `gc()` method itself is optimized and only copies referenced data
387391
pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result<RecordBatch> {
388-
// Early return optimization: Skip GC entirely if the batch contains no view arrays.
389-
// This avoids unnecessary processing for batches with only primitive types.
390-
let has_view_arrays = batch.columns().iter().any(|array| {
391-
matches!(
392-
array.data_type(),
393-
arrow::datatypes::DataType::Utf8View | arrow::datatypes::DataType::BinaryView
394-
)
395-
});
392+
let mut mutated = false;
393+
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
396394

397-
if !has_view_arrays {
398-
// RecordBatch::clone() is cheap - just Arc reference count bumps
399-
return Ok(batch.clone());
395+
for array in batch.columns() {
396+
let (gc_array, array_mutated) = gc_array(array)?;
397+
mutated |= array_mutated;
398+
new_columns.push(gc_array);
400399
}
401400

402-
let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
401+
if mutated {
402+
Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
403+
} else {
404+
Ok(batch.clone())
405+
}
406+
}
403407

404-
for array in batch.columns() {
405-
let gc_array = match array.data_type() {
406-
arrow::datatypes::DataType::Utf8View => {
407-
let string_view = array
408-
.as_any()
409-
.downcast_ref::<StringViewArray>()
410-
.expect("Utf8View array should downcast to StringViewArray");
411-
// Only perform GC if the array appears to be sliced (has potential waste).
412-
// The gc() method internally checks if GC is beneficial.
413-
if should_gc_view_array(string_view) {
414-
Arc::new(string_view.gc()) as Arc<dyn Array>
415-
} else {
416-
Arc::clone(array)
417-
}
408+
fn gc_array(array: &ArrayRef) -> Result<(ArrayRef, bool)> {
409+
match array.data_type() {
410+
DataType::Utf8View => {
411+
let string_view = array
412+
.as_any()
413+
.downcast_ref::<StringViewArray>()
414+
.expect("Utf8View array should downcast to StringViewArray");
415+
if should_gc_view_array(string_view) {
416+
Ok((Arc::new(string_view.gc()) as ArrayRef, true))
417+
} else {
418+
Ok((Arc::clone(array), false))
418419
}
419-
arrow::datatypes::DataType::BinaryView => {
420-
let binary_view = array
421-
.as_any()
422-
.downcast_ref::<BinaryViewArray>()
423-
.expect("BinaryView array should downcast to BinaryViewArray");
424-
// Only perform GC if the array appears to be sliced (has potential waste).
425-
// The gc() method internally checks if GC is beneficial.
426-
if should_gc_view_array(binary_view) {
427-
Arc::new(binary_view.gc()) as Arc<dyn Array>
428-
} else {
429-
Arc::clone(array)
430-
}
420+
}
421+
DataType::BinaryView => {
422+
let binary_view = array
423+
.as_any()
424+
.downcast_ref::<BinaryViewArray>()
425+
.expect("BinaryView array should downcast to BinaryViewArray");
426+
if should_gc_view_array(binary_view) {
427+
Ok((Arc::new(binary_view.gc()) as ArrayRef, true))
428+
} else {
429+
Ok((Arc::clone(array), false))
431430
}
432-
// Non-view arrays are passed through unchanged
433-
_ => Arc::clone(array),
434-
};
435-
new_columns.push(gc_array);
431+
}
432+
_ => gc_array_children(array),
433+
}
434+
}
435+
436+
fn gc_array_children(array: &ArrayRef) -> Result<(ArrayRef, bool)> {
437+
let data = array.to_data();
438+
if data.child_data().is_empty() {
439+
return Ok((Arc::clone(array), false));
440+
}
441+
442+
let mut mutated = false;
443+
let mut child_data = Vec::with_capacity(data.child_data().len());
444+
for child in data.child_data() {
445+
let child_array = make_array(child.clone());
446+
let (gc_child, child_mutated) = gc_array(&child_array)?;
447+
mutated |= child_mutated;
448+
child_data.push(gc_child.to_data());
436449
}
437450

438-
// Always return a new batch for consistency
439-
Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
451+
if !mutated {
452+
return Ok((Arc::clone(array), false));
453+
}
454+
455+
let rebuilt = ArrayDataBuilder::new(data.data_type().clone())
456+
.len(data.len())
457+
.offset(data.offset())
458+
.nulls(data.nulls().cloned())
459+
.buffers(data.buffers().to_vec())
460+
.child_data(child_data)
461+
.build()?;
462+
463+
Ok((make_array(rebuilt), true))
440464
}
441465

442466
/// Determines whether a view array should be garbage collected before spilling.
@@ -1103,10 +1127,18 @@ mod tests {
11031127
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array_ref])?;
11041128

11051129
// GC should return the original batch for small arrays
1130+
let should_gc = should_gc_view_array(
1131+
batch
1132+
.column(0)
1133+
.as_any()
1134+
.downcast_ref::<StringViewArray>()
1135+
.unwrap(),
1136+
);
11061137
let gc_batch = gc_view_arrays(&batch)?;
11071138

1108-
// The batch should be unchanged (cloned, not GC'd)
1139+
assert!(!should_gc);
11091140
assert_eq!(gc_batch.num_rows(), batch.num_rows());
1141+
assert!(Arc::ptr_eq(batch.column(0), gc_batch.column(0)));
11101142

11111143
Ok(())
11121144
}
@@ -1323,4 +1355,70 @@ mod tests {
13231355

13241356
Ok(())
13251357
}
1358+
1359+
#[test]
1360+
fn test_gc_recurses_into_nested_view_arrays() -> Result<()> {
1361+
use arrow::array::{DictionaryArray, Int32Array};
1362+
use arrow::buffer::Buffer;
1363+
1364+
let strings: Vec<String> = (0..200)
1365+
.map(|i| format!("http://example.com/nested/path/that/is/not/inlined/{i}"))
1366+
.collect();
1367+
let string_values = Arc::new(StringViewArray::from(strings)) as ArrayRef;
1368+
1369+
let list_data = ArrayDataBuilder::new(DataType::List(Arc::new(
1370+
Field::new_list_field(DataType::Utf8View, true),
1371+
)))
1372+
.len(20)
1373+
.buffers(vec![Buffer::from_iter((0..=20).map(|i| i * 5_i32))])
1374+
.child_data(vec![string_values.slice(0, 100).to_data()])
1375+
.build()?;
1376+
let list_array = make_array(list_data);
1377+
1378+
let keys = Int32Array::from_iter_values(0..20);
1379+
let dictionary = DictionaryArray::new(keys, string_values.slice(0, 20));
1380+
let dictionary_array = Arc::new(dictionary) as ArrayRef;
1381+
1382+
let schema = Arc::new(Schema::new(vec![
1383+
Field::new(
1384+
"list_strings",
1385+
DataType::List(Arc::new(Field::new_list_field(DataType::Utf8View, true))),
1386+
false,
1387+
),
1388+
Field::new(
1389+
"dictionary_strings",
1390+
DataType::Dictionary(
1391+
Box::new(DataType::Int32),
1392+
Box::new(DataType::Utf8View),
1393+
),
1394+
false,
1395+
),
1396+
]));
1397+
let batch = RecordBatch::try_new(schema, vec![list_array, dictionary_array])?;
1398+
let gc_batch = gc_view_arrays(&batch)?;
1399+
1400+
let gc_list_values = gc_batch.column(0).to_data().child_data()[0].clone();
1401+
let gc_list_values = make_array(gc_list_values);
1402+
let gc_list_values = gc_list_values
1403+
.as_any()
1404+
.downcast_ref::<StringViewArray>()
1405+
.unwrap();
1406+
assert!(
1407+
calculate_string_view_waste_ratio(gc_list_values) < 0.2,
1408+
"GC should compact nested List child views"
1409+
);
1410+
1411+
let gc_dictionary_values = gc_batch.column(1).to_data().child_data()[0].clone();
1412+
let gc_dictionary_values = make_array(gc_dictionary_values);
1413+
let gc_dictionary_values = gc_dictionary_values
1414+
.as_any()
1415+
.downcast_ref::<StringViewArray>()
1416+
.unwrap();
1417+
assert!(
1418+
calculate_string_view_waste_ratio(gc_dictionary_values) < 0.2,
1419+
"GC should compact nested Dictionary values"
1420+
);
1421+
1422+
Ok(())
1423+
}
13261424
}

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
2121
use crate::coop::cooperative;
2222
use crate::{common::spawn_buffered, metrics::SpillMetrics};
23-
use arrow::array::StringViewArray;
24-
use arrow::datatypes::SchemaRef;
23+
use arrow::array::{BinaryViewArray, GenericByteViewArray, StringViewArray};
24+
use arrow::datatypes::{ByteViewType, SchemaRef};
2525
use arrow::record_batch::RecordBatch;
2626
use datafusion_common::{DataFusionError, Result, config::SpillCompression};
2727
use datafusion_execution::SendableRecordBatchStream;
@@ -214,15 +214,24 @@ impl GetSlicedSize for RecordBatch {
214214
// "bytes needed if we materialized exactly this slice into fresh buffers".
215215
// This is a workaround until https://github.com/apache/arrow-rs/issues/8230
216216
if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
217-
for buffer in sv.data_buffers() {
218-
total += buffer.capacity();
219-
}
217+
total += byte_view_data_buffer_size(sv);
218+
}
219+
if let Some(bv) = array.as_any().downcast_ref::<BinaryViewArray>() {
220+
total += byte_view_data_buffer_size(bv);
220221
}
221222
}
222223
Ok(total)
223224
}
224225
}
225226

227+
fn byte_view_data_buffer_size<T: ByteViewType>(array: &GenericByteViewArray<T>) -> usize {
228+
array
229+
.data_buffers()
230+
.iter()
231+
.map(|buffer| buffer.capacity())
232+
.sum()
233+
}
234+
226235
#[cfg(test)]
227236
mod tests {
228237
use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};

0 commit comments

Comments
 (0)