Skip to content

Commit 2dd6bf2

Browse files
committed
Add IncrementalArrayBuilder API and cached filter results incremntally
REwork to be interms of IncremntalRecordBatchBuilder
1 parent 961e9af commit 2dd6bf2

File tree

11 files changed

+1091
-348
lines changed

11 files changed

+1091
-348
lines changed

arrow-array/src/builder/generic_bytes_view_builder.rs

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use hashbrown::HashTable;
2828
use crate::builder::ArrayBuilder;
2929
use crate::types::bytes::ByteArrayNativeType;
3030
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31-
use crate::{ArrayRef, GenericByteViewArray};
31+
use crate::{Array, ArrayRef, GenericByteViewArray};
3232

3333
const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB
3434
const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB
@@ -406,6 +406,98 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
406406
};
407407
buffer_size + in_progress + tracker + views + null
408408
}
409+
410+
/// Append all views from the given array into the inprogress builder
411+
///
412+
/// TODO: make a configurable option about what threshold to compact buffers
413+
pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
414+
let num_rows = array.len();
415+
if num_rows == 0 {
416+
return; // nothing to do
417+
}
418+
419+
// Flush the in-progress buffer
420+
self.flush_in_progress();
421+
422+
let null_buffer_builder = &mut self.null_buffer_builder;
423+
let buffers = &mut self.completed;
424+
let new_views = &mut self.views_builder;
425+
426+
// Copy nulls
427+
if let Some(nulls) = array.nulls() {
428+
null_buffer_builder.append_buffer(nulls);
429+
} else {
430+
null_buffer_builder.append_n_non_nulls(array.len());
431+
}
432+
433+
// Copy views.
434+
let ideal_buffer_size = ideal_buffer_size(array);
435+
let actual_buffer_size = array.get_buffer_memory_size();
436+
let starting_view = new_views.len();
437+
new_views.append_slice(array.views());
438+
439+
// Copy buffers
440+
441+
// if the array is not sparse, simply copy the buffers and update the views
442+
// to point to the new buffers
443+
if actual_buffer_size < 2 * ideal_buffer_size {
444+
let num_buffers_before: u32 = buffers.len().try_into().expect("buffer count overflow");
445+
buffers.extend_from_slice(array.data_buffers());
446+
447+
// Update any views that point to the old buffers
448+
for v in new_views.as_slice_mut()[starting_view..].iter_mut() {
449+
let view_len = *v as u32;
450+
// if view_len is 12 or less, data is inlined and doesn't need an update
451+
// if view is 12 or more, need to update the buffer offset
452+
if view_len > 12 {
453+
let mut view = ByteView::from(*v);
454+
let new_buffer_index = num_buffers_before + view.buffer_index;
455+
view.buffer_index = new_buffer_index;
456+
*v = view.into(); // update view
457+
}
458+
}
459+
} else {
460+
// otherwise the array is sparse so copy the data into a single new
461+
// buffer as well as updating the views
462+
let mut new_buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
463+
let new_buffer_index = buffers.len() as u32; // making one new buffer
464+
// Update any views that point to the old buffers.
465+
for v in new_views.as_slice_mut()[starting_view..].iter_mut() {
466+
let view_len = *v as u32;
467+
// if view_len is 12 or less, data is inlined and doesn't need an update
468+
// if view is 12 or more, need to copy the data to the new buffer and update the index and buffer offset
469+
if view_len > 12 {
470+
let mut view = ByteView::from(*v);
471+
let old_buffer = &array.data_buffers()[view.buffer_index as usize].as_slice();
472+
473+
let new_offset = new_buffer.len();
474+
let old_offset = view.offset as usize;
475+
let str_data = &old_buffer[old_offset..old_offset + view_len as usize];
476+
new_buffer.extend_from_slice(str_data);
477+
view.offset = new_offset as u32;
478+
view.buffer_index = new_buffer_index;
479+
*v = view.into(); // update view
480+
}
481+
}
482+
buffers.push(new_buffer.into());
483+
}
484+
}
485+
}
486+
487+
/// return the size required for buffers to hold all strings
488+
fn ideal_buffer_size<T: ByteViewType + ?Sized>(view_array: &GenericByteViewArray<T>) -> usize {
489+
view_array
490+
.views()
491+
.iter()
492+
.map(|v| {
493+
let len = (*v as u32) as usize;
494+
if len > 12 {
495+
len
496+
} else {
497+
0
498+
}
499+
})
500+
.sum()
409501
}
410502

411503
impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {

0 commit comments

Comments
 (0)