Skip to content

Commit a8932fa

Browse files
2010YOUY01findepi
authored andcommitted
Fix record batch memory size double counting (apache#13377)
* Fix record batch memory size double counting
1 parent ba0bd85 commit a8932fa

File tree

4 files changed

+232
-17
lines changed

4 files changed

+232
-17
lines changed

datafusion/core/tests/memory_limit/mod.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{
3131
};
3232
use datafusion_expr::{Expr, TableType};
3333
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
34+
use datafusion_physical_plan::spill::get_record_batch_memory_size;
3435
use futures::StreamExt;
3536
use std::any::Any;
3637
use std::num::NonZeroUsize;
@@ -265,14 +266,18 @@ async fn sort_spill_reservation() {
265266
// This test case shows how sort_spill_reservation works by
266267
// purposely sorting data that requires non trivial memory to
267268
// sort/merge.
269+
270+
// Merge operation needs extra memory to do row conversion, so make the
271+
// memory limit larger.
272+
let mem_limit = partition_size * 2;
268273
let test = TestCase::new()
269274
// This query uses a different order than the input table to
270275
// force a sort. It also needs to have multiple columns to
271276
// force RowFormat / interner that makes merge require
272277
// substantial memory
273278
.with_query("select * from t ORDER BY a , b DESC")
274279
// enough memory to sort if we don't try to merge it all at once
275-
.with_memory_limit(partition_size)
280+
.with_memory_limit(mem_limit)
276281
// use a single partition so only a sort is needed
277282
.with_scenario(scenario)
278283
.with_disk_manager_config(DiskManagerConfig::NewOs)
@@ -311,7 +316,7 @@ async fn sort_spill_reservation() {
311316
// reserve sufficient space up front for merge and this time,
312317
// which will force the spills to happen with less buffered
313318
// input and thus with enough to merge.
314-
.with_sort_spill_reservation_bytes(partition_size / 2);
319+
.with_sort_spill_reservation_bytes(mem_limit / 2);
315320

316321
test.with_config(config).with_expected_success().run().await;
317322
}
@@ -774,7 +779,7 @@ fn make_dict_batches() -> Vec<RecordBatch> {
774779

775780
// How many bytes does the memory from dict_batches consume?
776781
fn batches_byte_size(batches: &[RecordBatch]) -> usize {
777-
batches.iter().map(|b| b.get_array_memory_size()).sum()
782+
batches.iter().map(get_record_batch_memory_size).sum()
778783
}
779784

780785
#[derive(Debug)]

datafusion/physical-plan/src/sorts/builder.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::spill::get_record_batch_memory_size;
1819
use arrow::compute::interleave;
1920
use arrow::datatypes::SchemaRef;
2021
use arrow::record_batch::RecordBatch;
@@ -69,7 +70,8 @@ impl BatchBuilder {
6970

7071
/// Append a new batch in `stream_idx`
7172
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
72-
self.reservation.try_grow(batch.get_array_memory_size())?;
73+
self.reservation
74+
.try_grow(get_record_batch_memory_size(&batch))?;
7375
let batch_idx = self.batches.len();
7476
self.batches.push((stream_idx, batch));
7577
self.cursors[stream_idx] = BatchCursor {
@@ -141,7 +143,7 @@ impl BatchBuilder {
141143
stream_cursor.batch_idx = retained;
142144
retained += 1;
143145
} else {
144-
self.reservation.shrink(batch.get_array_memory_size());
146+
self.reservation.shrink(get_record_batch_memory_size(batch));
145147
}
146148
retain
147149
});

datafusion/physical-plan/src/sorts/sort.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use crate::metrics::{
3131
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
3232
};
3333
use crate::sorts::streaming_merge::StreamingMergeBuilder;
34-
use crate::spill::{read_spill_as_stream, spill_record_batches};
34+
use crate::spill::{
35+
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
36+
};
3537
use crate::stream::RecordBatchStreamAdapter;
3638
use crate::topk::TopK;
3739
use crate::{
@@ -288,10 +290,12 @@ impl ExternalSorter {
288290
}
289291
self.reserve_memory_for_merge()?;
290292

291-
let size = input.get_array_memory_size();
293+
let size = get_record_batch_memory_size(&input);
294+
292295
if self.reservation.try_grow(size).is_err() {
293296
let before = self.reservation.size();
294297
self.in_mem_sort().await?;
298+
295299
// Sorting may have freed memory, especially if fetch is `Some`
296300
//
297301
// As such we check again, and if the memory usage has dropped by
@@ -426,7 +430,7 @@ impl ExternalSorter {
426430
let size: usize = self
427431
.in_mem_batches
428432
.iter()
429-
.map(|x| x.get_array_memory_size())
433+
.map(get_record_batch_memory_size)
430434
.sum();
431435

432436
// Reserve headroom for next sort/merge
@@ -521,7 +525,8 @@ impl ExternalSorter {
521525
// Concatenate memory batches together and sort
522526
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
523527
self.in_mem_batches.clear();
524-
self.reservation.try_resize(batch.get_array_memory_size())?;
528+
self.reservation
529+
.try_resize(get_record_batch_memory_size(&batch))?;
525530
let reservation = self.reservation.take();
526531
return self.sort_batch_stream(batch, metrics, reservation);
527532
}
@@ -530,7 +535,8 @@ impl ExternalSorter {
530535
.into_iter()
531536
.map(|batch| {
532537
let metrics = self.metrics.baseline.intermediate();
533-
let reservation = self.reservation.split(batch.get_array_memory_size());
538+
let reservation =
539+
self.reservation.split(get_record_batch_memory_size(&batch));
534540
let input = self.sort_batch_stream(batch, metrics, reservation)?;
535541
Ok(spawn_buffered(input, 1))
536542
})
@@ -557,7 +563,7 @@ impl ExternalSorter {
557563
metrics: BaselineMetrics,
558564
reservation: MemoryReservation,
559565
) -> Result<SendableRecordBatchStream> {
560-
assert_eq!(batch.get_array_memory_size(), reservation.size());
566+
assert_eq!(get_record_batch_memory_size(&batch), reservation.size());
561567
let schema = batch.schema();
562568

563569
let fetch = self.fetch;
@@ -1187,9 +1193,9 @@ mod tests {
11871193

11881194
assert_eq!(metrics.output_rows().unwrap(), 10000);
11891195
assert!(metrics.elapsed_compute().unwrap() > 0);
1190-
assert_eq!(metrics.spill_count().unwrap(), 4);
1191-
assert_eq!(metrics.spilled_bytes().unwrap(), 38784);
1192-
assert_eq!(metrics.spilled_rows().unwrap(), 9600);
1196+
assert_eq!(metrics.spill_count().unwrap(), 3);
1197+
assert_eq!(metrics.spilled_bytes().unwrap(), 36000);
1198+
assert_eq!(metrics.spilled_rows().unwrap(), 9000);
11931199

11941200
let columns = result[0].columns();
11951201

datafusion/physical-plan/src/spill.rs

+205-3
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
use std::fs::File;
2121
use std::io::BufReader;
2222
use std::path::{Path, PathBuf};
23+
use std::ptr::NonNull;
2324

25+
use arrow::array::ArrayData;
2426
use arrow::datatypes::SchemaRef;
2527
use arrow::ipc::reader::FileReader;
2628
use arrow::record_batch::RecordBatch;
2729
use log::debug;
2830
use tokio::sync::mpsc::Sender;
2931

30-
use datafusion_common::{exec_datafusion_err, Result};
32+
use datafusion_common::{exec_datafusion_err, HashSet, Result};
3133
use datafusion_execution::disk_manager::RefCountedTempFile;
3234
use datafusion_execution::memory_pool::human_readable_size;
3335
use datafusion_execution::SendableRecordBatchStream;
@@ -109,10 +111,83 @@ pub fn spill_record_batch_by_size(
109111
Ok(())
110112
}
111113

114+
/// Calculate total used memory of this batch.
115+
///
116+
/// This function is used to estimate the physical memory usage of the `RecordBatch`.
117+
/// It only counts the memory of large data `Buffer`s, and ignores metadata like
118+
/// types and pointers.
119+
/// The implementation will add up all unique `Buffer`'s memory
120+
/// size, due to:
121+
/// - The data pointer inside `Buffer` are memory regions returned by global memory
122+
/// allocator, those regions can't have overlap.
123+
/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap
124+
/// or reuse the same `Buffer`. For example: taking a slice from `Array`.
125+
///
126+
/// Example:
127+
/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing
128+
/// to a sub-region of the same buffer.
129+
///
130+
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
131+
/// ^ ^ ^ ^
132+
/// | | | |
133+
/// col1->{ } | |
134+
/// col2--------->{ }
135+
///
136+
/// In the above case, `get_record_batch_memory_size` will return the size of
137+
/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
138+
///
139+
/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
140+
/// buffer memory size if multiple arrays within the batch are sharing the same
141+
/// `Buffer`. This method provides temporary fix until the issue is resolved:
142+
/// <https://github.com/apache/arrow-rs/issues/6439>
143+
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
144+
// Store pointers to `Buffer`'s start memory address (instead of actual
145+
// used data region's pointer represented by current `Array`)
146+
let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
147+
let mut total_size = 0;
148+
149+
for array in batch.columns() {
150+
let array_data = array.to_data();
151+
count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
152+
}
153+
154+
total_size
155+
}
156+
157+
/// Count the memory usage of `array_data` and its children recursively.
158+
fn count_array_data_memory_size(
159+
array_data: &ArrayData,
160+
counted_buffers: &mut HashSet<NonNull<u8>>,
161+
total_size: &mut usize,
162+
) {
163+
// Count memory usage for `array_data`
164+
for buffer in array_data.buffers() {
165+
if counted_buffers.insert(buffer.data_ptr()) {
166+
*total_size += buffer.capacity();
167+
} // Otherwise the buffer's memory is already counted
168+
}
169+
170+
if let Some(null_buffer) = array_data.nulls() {
171+
if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
172+
*total_size += null_buffer.inner().inner().capacity();
173+
}
174+
}
175+
176+
// Count all children `ArrayData` recursively
177+
for child in array_data.child_data() {
178+
count_array_data_memory_size(child, counted_buffers, total_size);
179+
}
180+
}
181+
112182
#[cfg(test)]
113183
mod tests {
184+
use super::*;
114185
use crate::spill::{spill_record_batch_by_size, spill_record_batches};
115186
use crate::test::build_table_i32;
187+
use arrow::array::{Float64Array, Int32Array};
188+
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
189+
use arrow::record_batch::RecordBatch;
190+
use arrow_array::ListArray;
116191
use datafusion_common::Result;
117192
use datafusion_execution::disk_manager::DiskManagerConfig;
118193
use datafusion_execution::DiskManager;
@@ -147,7 +222,7 @@ mod tests {
147222
assert_eq!(cnt.unwrap(), num_rows);
148223

149224
let file = BufReader::new(File::open(spill_file.path())?);
150-
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
225+
let reader = FileReader::try_new(file, None)?;
151226

152227
assert_eq!(reader.num_batches(), 2);
153228
assert_eq!(reader.schema(), schema);
@@ -175,11 +250,138 @@ mod tests {
175250
)?;
176251

177252
let file = BufReader::new(File::open(spill_file.path())?);
178-
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
253+
let reader = FileReader::try_new(file, None)?;
179254

180255
assert_eq!(reader.num_batches(), 4);
181256
assert_eq!(reader.schema(), schema);
182257

183258
Ok(())
184259
}
260+
261+
#[test]
262+
fn test_get_record_batch_memory_size() {
263+
// Create a simple record batch with two columns
264+
let schema = Arc::new(Schema::new(vec![
265+
Field::new("ints", DataType::Int32, true),
266+
Field::new("float64", DataType::Float64, false),
267+
]));
268+
269+
let int_array =
270+
Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
271+
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
272+
273+
let batch = RecordBatch::try_new(
274+
schema,
275+
vec![Arc::new(int_array), Arc::new(float64_array)],
276+
)
277+
.unwrap();
278+
279+
let size = get_record_batch_memory_size(&batch);
280+
assert_eq!(size, 60);
281+
}
282+
283+
#[test]
284+
fn test_get_record_batch_memory_size_with_null() {
285+
// Create a simple record batch with two columns
286+
let schema = Arc::new(Schema::new(vec![
287+
Field::new("ints", DataType::Int32, true),
288+
Field::new("float64", DataType::Float64, false),
289+
]));
290+
291+
let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
292+
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);
293+
294+
let batch = RecordBatch::try_new(
295+
schema,
296+
vec![Arc::new(int_array), Arc::new(float64_array)],
297+
)
298+
.unwrap();
299+
300+
let size = get_record_batch_memory_size(&batch);
301+
assert_eq!(size, 100);
302+
}
303+
304+
#[test]
305+
fn test_get_record_batch_memory_size_empty() {
306+
// Test with empty record batch
307+
let schema = Arc::new(Schema::new(vec![Field::new(
308+
"ints",
309+
DataType::Int32,
310+
false,
311+
)]));
312+
313+
let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
314+
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();
315+
316+
let size = get_record_batch_memory_size(&batch);
317+
assert_eq!(size, 0, "Empty batch should have 0 memory size");
318+
}
319+
320+
#[test]
321+
fn test_get_record_batch_memory_size_shared_buffer() {
322+
// Test with slices that share the same underlying buffer
323+
let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
324+
let slice1 = original.slice(0, 3);
325+
let slice2 = original.slice(2, 3);
326+
327+
// `RecordBatch` with `original` array
328+
// ----
329+
let schema_origin = Arc::new(Schema::new(vec![Field::new(
330+
"origin_col",
331+
DataType::Int32,
332+
false,
333+
)]));
334+
let batch_origin =
335+
RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();
336+
337+
// `RecordBatch` with all columns are reference to `original` array
338+
// ----
339+
let schema = Arc::new(Schema::new(vec![
340+
Field::new("slice1", DataType::Int32, false),
341+
Field::new("slice2", DataType::Int32, false),
342+
]));
343+
344+
let batch_sliced =
345+
RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
346+
.unwrap();
347+
348+
// Two sizes should all be only counting the buffer in `original` array
349+
let size_origin = get_record_batch_memory_size(&batch_origin);
350+
let size_sliced = get_record_batch_memory_size(&batch_sliced);
351+
352+
assert_eq!(size_origin, size_sliced);
353+
}
354+
355+
#[test]
356+
fn test_get_record_batch_memory_size_nested_array() {
357+
let schema = Arc::new(Schema::new(vec![
358+
Field::new(
359+
"nested_int",
360+
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
361+
false,
362+
),
363+
Field::new(
364+
"nested_int2",
365+
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
366+
false,
367+
),
368+
]));
369+
370+
let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
371+
Some(vec![Some(1), Some(2), Some(3)]),
372+
]);
373+
374+
let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
375+
Some(vec![Some(4), Some(5), Some(6)]),
376+
]);
377+
378+
let batch = RecordBatch::try_new(
379+
schema,
380+
vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
381+
)
382+
.unwrap();
383+
384+
let size = get_record_batch_memory_size(&batch);
385+
assert_eq!(size, 8320);
386+
}
185387
}

0 commit comments

Comments
 (0)