|
20 | 20 | use std::fs::File;
|
21 | 21 | use std::io::BufReader;
|
22 | 22 | use std::path::{Path, PathBuf};
|
| 23 | +use std::ptr::NonNull; |
23 | 24 |
|
| 25 | +use arrow::array::ArrayData; |
24 | 26 | use arrow::datatypes::SchemaRef;
|
25 | 27 | use arrow::ipc::reader::FileReader;
|
26 | 28 | use arrow::record_batch::RecordBatch;
|
27 | 29 | use log::debug;
|
28 | 30 | use tokio::sync::mpsc::Sender;
|
29 | 31 |
|
30 |
| -use datafusion_common::{exec_datafusion_err, Result}; |
| 32 | +use datafusion_common::{exec_datafusion_err, HashSet, Result}; |
31 | 33 | use datafusion_execution::disk_manager::RefCountedTempFile;
|
32 | 34 | use datafusion_execution::memory_pool::human_readable_size;
|
33 | 35 | use datafusion_execution::SendableRecordBatchStream;
|
@@ -109,10 +111,83 @@ pub fn spill_record_batch_by_size(
|
109 | 111 | Ok(())
|
110 | 112 | }
|
111 | 113 |
|
| 114 | +/// Calculate total used memory of this batch. |
| 115 | +/// |
| 116 | +/// This function is used to estimate the physical memory usage of the `RecordBatch`. |
| 117 | +/// It only counts the memory of large data `Buffer`s, and ignores metadata like |
| 118 | +/// types and pointers. |
| 119 | +/// The implementation will add up all unique `Buffer`'s memory |
| 120 | +/// size, due to: |
| 121 | +/// - The data pointer inside `Buffer` are memory regions returned by global memory |
| 122 | +/// allocator, those regions can't have overlap. |
| 123 | +/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap |
| 124 | +/// or reuse the same `Buffer`. For example: taking a slice from `Array`. |
| 125 | +/// |
| 126 | +/// Example: |
| 127 | +/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing |
| 128 | +/// to a sub-region of the same buffer. |
| 129 | +/// |
| 130 | +/// {xxxxxxxxxxxxxxxxxxx} <--- buffer |
| 131 | +/// ^ ^ ^ ^ |
| 132 | +/// | | | | |
| 133 | +/// col1->{ } | | |
| 134 | +/// col2--------->{ } |
| 135 | +/// |
| 136 | +/// In the above case, `get_record_batch_memory_size` will return the size of |
| 137 | +/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size. |
| 138 | +/// |
| 139 | +/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the |
| 140 | +/// buffer memory size if multiple arrays within the batch are sharing the same |
| 141 | +/// `Buffer`. This method provides temporary fix until the issue is resolved: |
| 142 | +/// <https://github.com/apache/arrow-rs/issues/6439> |
| 143 | +pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize { |
| 144 | + // Store pointers to `Buffer`'s start memory address (instead of actual |
| 145 | + // used data region's pointer represented by current `Array`) |
| 146 | + let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new(); |
| 147 | + let mut total_size = 0; |
| 148 | + |
| 149 | + for array in batch.columns() { |
| 150 | + let array_data = array.to_data(); |
| 151 | + count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size); |
| 152 | + } |
| 153 | + |
| 154 | + total_size |
| 155 | +} |
| 156 | + |
| 157 | +/// Count the memory usage of `array_data` and its children recursively. |
| 158 | +fn count_array_data_memory_size( |
| 159 | + array_data: &ArrayData, |
| 160 | + counted_buffers: &mut HashSet<NonNull<u8>>, |
| 161 | + total_size: &mut usize, |
| 162 | +) { |
| 163 | + // Count memory usage for `array_data` |
| 164 | + for buffer in array_data.buffers() { |
| 165 | + if counted_buffers.insert(buffer.data_ptr()) { |
| 166 | + *total_size += buffer.capacity(); |
| 167 | + } // Otherwise the buffer's memory is already counted |
| 168 | + } |
| 169 | + |
| 170 | + if let Some(null_buffer) = array_data.nulls() { |
| 171 | + if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) { |
| 172 | + *total_size += null_buffer.inner().inner().capacity(); |
| 173 | + } |
| 174 | + } |
| 175 | + |
| 176 | + // Count all children `ArrayData` recursively |
| 177 | + for child in array_data.child_data() { |
| 178 | + count_array_data_memory_size(child, counted_buffers, total_size); |
| 179 | + } |
| 180 | +} |
| 181 | + |
112 | 182 | #[cfg(test)]
|
113 | 183 | mod tests {
|
| 184 | + use super::*; |
114 | 185 | use crate::spill::{spill_record_batch_by_size, spill_record_batches};
|
115 | 186 | use crate::test::build_table_i32;
|
| 187 | + use arrow::array::{Float64Array, Int32Array}; |
| 188 | + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; |
| 189 | + use arrow::record_batch::RecordBatch; |
| 190 | + use arrow_array::ListArray; |
116 | 191 | use datafusion_common::Result;
|
117 | 192 | use datafusion_execution::disk_manager::DiskManagerConfig;
|
118 | 193 | use datafusion_execution::DiskManager;
|
@@ -147,7 +222,7 @@ mod tests {
|
147 | 222 | assert_eq!(cnt.unwrap(), num_rows);
|
148 | 223 |
|
149 | 224 | let file = BufReader::new(File::open(spill_file.path())?);
|
150 |
| - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; |
| 225 | + let reader = FileReader::try_new(file, None)?; |
151 | 226 |
|
152 | 227 | assert_eq!(reader.num_batches(), 2);
|
153 | 228 | assert_eq!(reader.schema(), schema);
|
@@ -175,11 +250,138 @@ mod tests {
|
175 | 250 | )?;
|
176 | 251 |
|
177 | 252 | let file = BufReader::new(File::open(spill_file.path())?);
|
178 |
| - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; |
| 253 | + let reader = FileReader::try_new(file, None)?; |
179 | 254 |
|
180 | 255 | assert_eq!(reader.num_batches(), 4);
|
181 | 256 | assert_eq!(reader.schema(), schema);
|
182 | 257 |
|
183 | 258 | Ok(())
|
184 | 259 | }
|
| 260 | + |
| 261 | + #[test] |
| 262 | + fn test_get_record_batch_memory_size() { |
| 263 | + // Create a simple record batch with two columns |
| 264 | + let schema = Arc::new(Schema::new(vec![ |
| 265 | + Field::new("ints", DataType::Int32, true), |
| 266 | + Field::new("float64", DataType::Float64, false), |
| 267 | + ])); |
| 268 | + |
| 269 | + let int_array = |
| 270 | + Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]); |
| 271 | + let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]); |
| 272 | + |
| 273 | + let batch = RecordBatch::try_new( |
| 274 | + schema, |
| 275 | + vec![Arc::new(int_array), Arc::new(float64_array)], |
| 276 | + ) |
| 277 | + .unwrap(); |
| 278 | + |
| 279 | + let size = get_record_batch_memory_size(&batch); |
| 280 | + assert_eq!(size, 60); |
| 281 | + } |
| 282 | + |
| 283 | + #[test] |
| 284 | + fn test_get_record_batch_memory_size_with_null() { |
| 285 | + // Create a simple record batch with two columns |
| 286 | + let schema = Arc::new(Schema::new(vec![ |
| 287 | + Field::new("ints", DataType::Int32, true), |
| 288 | + Field::new("float64", DataType::Float64, false), |
| 289 | + ])); |
| 290 | + |
| 291 | + let int_array = Int32Array::from(vec![None, Some(2), Some(3)]); |
| 292 | + let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]); |
| 293 | + |
| 294 | + let batch = RecordBatch::try_new( |
| 295 | + schema, |
| 296 | + vec![Arc::new(int_array), Arc::new(float64_array)], |
| 297 | + ) |
| 298 | + .unwrap(); |
| 299 | + |
| 300 | + let size = get_record_batch_memory_size(&batch); |
| 301 | + assert_eq!(size, 100); |
| 302 | + } |
| 303 | + |
| 304 | + #[test] |
| 305 | + fn test_get_record_batch_memory_size_empty() { |
| 306 | + // Test with empty record batch |
| 307 | + let schema = Arc::new(Schema::new(vec![Field::new( |
| 308 | + "ints", |
| 309 | + DataType::Int32, |
| 310 | + false, |
| 311 | + )])); |
| 312 | + |
| 313 | + let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>); |
| 314 | + let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap(); |
| 315 | + |
| 316 | + let size = get_record_batch_memory_size(&batch); |
| 317 | + assert_eq!(size, 0, "Empty batch should have 0 memory size"); |
| 318 | + } |
| 319 | + |
| 320 | + #[test] |
| 321 | + fn test_get_record_batch_memory_size_shared_buffer() { |
| 322 | + // Test with slices that share the same underlying buffer |
| 323 | + let original = Int32Array::from(vec![1, 2, 3, 4, 5]); |
| 324 | + let slice1 = original.slice(0, 3); |
| 325 | + let slice2 = original.slice(2, 3); |
| 326 | + |
| 327 | + // `RecordBatch` with `original` array |
| 328 | + // ---- |
| 329 | + let schema_origin = Arc::new(Schema::new(vec![Field::new( |
| 330 | + "origin_col", |
| 331 | + DataType::Int32, |
| 332 | + false, |
| 333 | + )])); |
| 334 | + let batch_origin = |
| 335 | + RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap(); |
| 336 | + |
| 337 | + // `RecordBatch` with all columns are reference to `original` array |
| 338 | + // ---- |
| 339 | + let schema = Arc::new(Schema::new(vec![ |
| 340 | + Field::new("slice1", DataType::Int32, false), |
| 341 | + Field::new("slice2", DataType::Int32, false), |
| 342 | + ])); |
| 343 | + |
| 344 | + let batch_sliced = |
| 345 | + RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)]) |
| 346 | + .unwrap(); |
| 347 | + |
| 348 | + // Two sizes should all be only counting the buffer in `original` array |
| 349 | + let size_origin = get_record_batch_memory_size(&batch_origin); |
| 350 | + let size_sliced = get_record_batch_memory_size(&batch_sliced); |
| 351 | + |
| 352 | + assert_eq!(size_origin, size_sliced); |
| 353 | + } |
| 354 | + |
| 355 | + #[test] |
| 356 | + fn test_get_record_batch_memory_size_nested_array() { |
| 357 | + let schema = Arc::new(Schema::new(vec![ |
| 358 | + Field::new( |
| 359 | + "nested_int", |
| 360 | + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), |
| 361 | + false, |
| 362 | + ), |
| 363 | + Field::new( |
| 364 | + "nested_int2", |
| 365 | + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), |
| 366 | + false, |
| 367 | + ), |
| 368 | + ])); |
| 369 | + |
| 370 | + let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![ |
| 371 | + Some(vec![Some(1), Some(2), Some(3)]), |
| 372 | + ]); |
| 373 | + |
| 374 | + let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![ |
| 375 | + Some(vec![Some(4), Some(5), Some(6)]), |
| 376 | + ]); |
| 377 | + |
| 378 | + let batch = RecordBatch::try_new( |
| 379 | + schema, |
| 380 | + vec![Arc::new(int_list_array), Arc::new(int_list_array2)], |
| 381 | + ) |
| 382 | + .unwrap(); |
| 383 | + |
| 384 | + let size = get_record_batch_memory_size(&batch); |
| 385 | + assert_eq!(size, 8320); |
| 386 | + } |
185 | 387 | }
|
0 commit comments