-
Notifications
You must be signed in to change notification settings - Fork 928
Implement GenericByteViewArray::gc
for compacting StringViewArray and ByteViewArray
#5707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -25,6 +25,7 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; | |||||||||||||||||
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView}; | ||||||||||||||||||
use arrow_schema::{ArrowError, DataType}; | ||||||||||||||||||
use std::any::Any; | ||||||||||||||||||
use std::collections::BTreeMap; | ||||||||||||||||||
use std::fmt::Debug; | ||||||||||||||||||
use std::marker::PhantomData; | ||||||||||||||||||
use std::sync::Arc; | ||||||||||||||||||
|
@@ -265,6 +266,115 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |||||||||||||||||
phantom: Default::default(), | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/// Returns whether the buffers are compact | ||||||||||||||||||
pub(self) fn compact_check(&self) -> Vec<bool> { | ||||||||||||||||||
let mut checkers: Vec<_> = self | ||||||||||||||||||
.buffers | ||||||||||||||||||
.iter() | ||||||||||||||||||
.map(|b| CompactChecker::new(b.len())) | ||||||||||||||||||
.collect(); | ||||||||||||||||||
|
||||||||||||||||||
for (i, view) in self.views.iter().enumerate() { | ||||||||||||||||||
let view = ByteView::from(*view); | ||||||||||||||||||
if self.is_null(i) || view.length <= 12 { | ||||||||||||||||||
continue; | ||||||||||||||||||
} | ||||||||||||||||||
checkers[view.buffer_index as usize] | ||||||||||||||||||
.accumulate(view.offset as usize, view.length as usize); | ||||||||||||||||||
} | ||||||||||||||||||
checkers.into_iter().map(|c| c.finish()).collect() | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/// Returns a buffer compact version of this array | ||||||||||||||||||
/// | ||||||||||||||||||
/// The original array will *not* be modified | ||||||||||||||||||
/// | ||||||||||||||||||
/// # Garbage Collection | ||||||||||||||||||
/// | ||||||||||||||||||
/// Before GC: | ||||||||||||||||||
/// ```text | ||||||||||||||||||
/// ┌──────┐ | ||||||||||||||||||
/// │......│ | ||||||||||||||||||
/// │......│ | ||||||||||||||||||
/// ┌────────────────────┐ ┌ ─ ─ ─ ▶ │Data1 │ Large buffer | ||||||||||||||||||
/// │ View 1 │─ ─ ─ ─ │......│ with data that | ||||||||||||||||||
/// ├────────────────────┤ │......│ is not referred | ||||||||||||||||||
/// │ View 2 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data2 │ to by View 1 or | ||||||||||||||||||
/// └────────────────────┘ │......│ View 2 | ||||||||||||||||||
/// │......│ | ||||||||||||||||||
/// 2 views, refer to │......│ | ||||||||||||||||||
/// small portions of a └──────┘ | ||||||||||||||||||
/// large buffer | ||||||||||||||||||
/// ``` | ||||||||||||||||||
/// | ||||||||||||||||||
/// After GC: | ||||||||||||||||||
/// | ||||||||||||||||||
/// ```text | ||||||||||||||||||
/// ┌────────────────────┐ ┌─────┐ After gc, only | ||||||||||||||||||
/// │ View 1 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data1│ data that is | ||||||||||||||||||
/// ├────────────────────┤ ┌ ─ ─ ─ ▶ │Data2│ pointed to by | ||||||||||||||||||
/// │ View 2 │─ ─ ─ ─ └─────┘ the views is | ||||||||||||||||||
/// └────────────────────┘ left | ||||||||||||||||||
/// | ||||||||||||||||||
/// | ||||||||||||||||||
/// 2 views | ||||||||||||||||||
/// ``` | ||||||||||||||||||
/// This method will compact the data buffers to only include the data | ||||||||||||||||||
/// that is pointed to by the views, | ||||||||||||||||||
/// and return a new array with the compacted data buffers. | ||||||||||||||||||
/// The original array will be left as is. | ||||||||||||||||||
pub fn gc(&self) -> Self { | ||||||||||||||||||
let check_result = self.compact_check(); | ||||||||||||||||||
|
||||||||||||||||||
if check_result.iter().all(|x| *x) { | ||||||||||||||||||
return self.clone(); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
let mut new_views = Vec::with_capacity(self.views.len()); | ||||||||||||||||||
let mut new_bufs: Vec<Vec<u8>> = vec![vec![]; self.buffers.len()]; | ||||||||||||||||||
for (i, view) in self.views.iter().enumerate() { | ||||||||||||||||||
let mut bv = ByteView::from(*view); | ||||||||||||||||||
let idx = bv.buffer_index as usize; | ||||||||||||||||||
if self.is_null(i) || bv.length <= 12 || check_result[idx] { | ||||||||||||||||||
new_views.push(*view); | ||||||||||||||||||
continue; | ||||||||||||||||||
} | ||||||||||||||||||
// copy data to new buffer | ||||||||||||||||||
let data = self.buffers.get(idx).unwrap(); | ||||||||||||||||||
let offset = new_bufs[idx].len(); | ||||||||||||||||||
let len = bv.length as usize; | ||||||||||||||||||
new_bufs[idx].extend_from_slice( | ||||||||||||||||||
data.get(bv.offset as usize..bv.offset as usize + len) | ||||||||||||||||||
.unwrap(), | ||||||||||||||||||
); | ||||||||||||||||||
// update view | ||||||||||||||||||
bv.offset = offset as u32; | ||||||||||||||||||
|
||||||||||||||||||
new_views.push(bv.into()); | ||||||||||||||||||
} | ||||||||||||||||||
let new_bufs: Vec<_> = new_bufs.into_iter().map(Buffer::from_vec).collect(); | ||||||||||||||||||
|
||||||||||||||||||
let new_views = ScalarBuffer::from(new_views); | ||||||||||||||||||
|
||||||||||||||||||
let new_buffers = self | ||||||||||||||||||
.buffers | ||||||||||||||||||
.iter() | ||||||||||||||||||
.enumerate() | ||||||||||||||||||
.map(|(idx, buf)| { | ||||||||||||||||||
if check_result[idx] { | ||||||||||||||||||
buf.clone() | ||||||||||||||||||
} else { | ||||||||||||||||||
new_bufs[idx].clone() | ||||||||||||||||||
} | ||||||||||||||||||
}) | ||||||||||||||||||
.collect(); | ||||||||||||||||||
|
||||||||||||||||||
let mut compacted = self.clone(); | ||||||||||||||||||
compacted.buffers = new_buffers; | ||||||||||||||||||
compacted.views = new_views; | ||||||||||||||||||
compacted | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
impl<T: ByteViewType + ?Sized> Debug for GenericByteViewArray<T> { | ||||||||||||||||||
|
@@ -482,6 +592,67 @@ impl From<Vec<Option<String>>> for StringViewArray { | |||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/// A helper struct that used to check if the array is compact view | ||||||||||||||||||
/// | ||||||||||||||||||
/// The checker is lazy and will not check the array until `finish` is called. | ||||||||||||||||||
/// | ||||||||||||||||||
/// This is based on the assumption that the array will most likely to be not compact, | ||||||||||||||||||
/// so it is likely to scan the entire array. | ||||||||||||||||||
/// | ||||||||||||||||||
/// Then it is better to do the check at once, rather than doing it for each accumulate operation. | ||||||||||||||||||
struct CompactChecker { | ||||||||||||||||||
length: usize, | ||||||||||||||||||
intervals: BTreeMap<usize, usize>, | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+595
to
+606
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there is a better algorithm for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend we pull the "compact checking" algorithm into a new PR to discuss it -- I am not sure about the assumption that StringViewArrays will mostly often be compacted (I would actually expect the opposite) |
||||||||||||||||||
|
||||||||||||||||||
impl CompactChecker { | ||||||||||||||||||
/// Create a new checker with the expected length of the buffer | ||||||||||||||||||
pub fn new(length: usize) -> Self { | ||||||||||||||||||
Self { | ||||||||||||||||||
length, | ||||||||||||||||||
intervals: BTreeMap::new(), | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/// Accumulate a new covered interval to the checker | ||||||||||||||||||
pub fn accumulate(&mut self, offset: usize, length: usize) { | ||||||||||||||||||
if length == 0 { | ||||||||||||||||||
return; | ||||||||||||||||||
} | ||||||||||||||||||
let end = offset + length; | ||||||||||||||||||
if end > self.length { | ||||||||||||||||||
panic!( | ||||||||||||||||||
"Invalid interval: offset {} length {} is out of bound of length {}", | ||||||||||||||||||
offset, length, self.length | ||||||||||||||||||
); | ||||||||||||||||||
} | ||||||||||||||||||
if let Some(val) = self.intervals.get_mut(&offset) { | ||||||||||||||||||
if *val < end { | ||||||||||||||||||
*val = end; | ||||||||||||||||||
} | ||||||||||||||||||
} else { | ||||||||||||||||||
self.intervals.insert(offset, end); | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/// Check if the checker is fully covered | ||||||||||||||||||
pub fn finish(self) -> bool { | ||||||||||||||||||
// check if the coverage is continuous and full | ||||||||||||||||||
let mut last_end = 0; | ||||||||||||||||||
|
||||||||||||||||||
for (start, end) in self.intervals.iter() { | ||||||||||||||||||
if *start > last_end { | ||||||||||||||||||
return false; | ||||||||||||||||||
} | ||||||||||||||||||
if *end > last_end { | ||||||||||||||||||
last_end = *end; | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
last_end == self.length | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
#[cfg(test)] | ||||||||||||||||||
mod tests { | ||||||||||||||||||
use crate::builder::{BinaryViewBuilder, StringViewBuilder}; | ||||||||||||||||||
|
@@ -645,4 +816,175 @@ mod tests { | |||||||||||||||||
|
||||||||||||||||||
StringViewArray::new(views, buffers, None); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
#[test] | ||||||||||||||||||
#[should_panic(expected = "Invalid interval: offset 0 length 13 is out of bound of length 12")] | ||||||||||||||||||
fn test_compact_checker() { | ||||||||||||||||||
use super::CompactChecker; | ||||||||||||||||||
|
||||||||||||||||||
// single coverage, full | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 10); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
// single coverage, partial | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
assert!(!checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
// multiple coverage, no overlapping, partial | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
checker.accumulate(5, 4); | ||||||||||||||||||
assert!(!checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
//multiple coverage, no overlapping, full | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
checker.accumulate(5, 5); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
//multiple coverage, overlapping, partial | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
checker.accumulate(4, 5); | ||||||||||||||||||
assert!(!checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
//multiple coverage, overlapping, full | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
checker.accumulate(4, 6); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
//mutiple coverage, no overlapping, full, out of order | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(4, 6); | ||||||||||||||||||
checker.accumulate(0, 4); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
// multiple coverage, overlapping, full, out of order | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(4, 6); | ||||||||||||||||||
checker.accumulate(0, 4); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
// multiple coverage, overlapping, full, containing null | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
checker.accumulate(5, 0); | ||||||||||||||||||
checker.accumulate(5, 5); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
// multiple coverage, overlapping, full, containing null | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
checker.accumulate(5, 0); | ||||||||||||||||||
checker.accumulate(4, 6); | ||||||||||||||||||
checker.accumulate(5, 5); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
// multiple coverage, overlapping, full, containing null | ||||||||||||||||||
// | ||||||||||||||||||
// this case is for attacking those implementation that only check | ||||||||||||||||||
// the lower-bound of the interval | ||||||||||||||||||
let mut checker = CompactChecker::new(10); | ||||||||||||||||||
checker.accumulate(0, 5); | ||||||||||||||||||
checker.accumulate(5, 0); | ||||||||||||||||||
checker.accumulate(1, 9); | ||||||||||||||||||
checker.accumulate(2, 3); | ||||||||||||||||||
checker.accumulate(3, 1); | ||||||||||||||||||
checker.accumulate(9, 1); | ||||||||||||||||||
assert!(checker.finish()); | ||||||||||||||||||
|
||||||||||||||||||
// panic case, out of bound | ||||||||||||||||||
let mut checker = CompactChecker::new(12); | ||||||||||||||||||
checker.accumulate(0, 13); | ||||||||||||||||||
checker.finish(); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
#[test] | ||||||||||||||||||
fn test_gc() { | ||||||||||||||||||
// --------------------------------------------------------------------- | ||||||||||||||||||
// test compact on compacted data | ||||||||||||||||||
|
||||||||||||||||||
let array = { | ||||||||||||||||||
let mut builder = StringViewBuilder::new(); | ||||||||||||||||||
builder.append_value("I look at you all"); | ||||||||||||||||||
builder.append_option(Some("see the love there that's sleeping")); | ||||||||||||||||||
builder.finish() | ||||||||||||||||||
}; | ||||||||||||||||||
let compacted = array.gc(); | ||||||||||||||||||
// verify it is a shallow copy | ||||||||||||||||||
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); | ||||||||||||||||||
|
||||||||||||||||||
// --------------------------------------------------------------------- | ||||||||||||||||||
// test compact on non-compacted data | ||||||||||||||||||
|
||||||||||||||||||
let mut array = { | ||||||||||||||||||
let mut builder = StringViewBuilder::new(); | ||||||||||||||||||
builder.append_value("while my guitar gently weeps"); | ||||||||||||||||||
builder.finish() | ||||||||||||||||||
}; | ||||||||||||||||||
// shrink the view | ||||||||||||||||||
let mut view = ByteView::from(array.views[0]); | ||||||||||||||||||
view.length = 15; | ||||||||||||||||||
let new_views = ScalarBuffer::from(vec![view.into()]); | ||||||||||||||||||
array.views = new_views; | ||||||||||||||||||
let compacted = array.gc(); | ||||||||||||||||||
// verify it is a deep copy | ||||||||||||||||||
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); | ||||||||||||||||||
// verify content | ||||||||||||||||||
assert_eq!(array.value(0), compacted.value(0)); | ||||||||||||||||||
// verify compacted | ||||||||||||||||||
assert!(compacted.compact_check().iter().all(|x| *x)); | ||||||||||||||||||
|
||||||||||||||||||
// --------------------------------------------------------------------- | ||||||||||||||||||
// test compact on array containing null | ||||||||||||||||||
|
||||||||||||||||||
let mut array = { | ||||||||||||||||||
let mut builder = StringViewBuilder::new(); | ||||||||||||||||||
builder.append_null(); | ||||||||||||||||||
builder.append_option(Some("I don't know why nobody told you")); | ||||||||||||||||||
builder.finish() | ||||||||||||||||||
}; | ||||||||||||||||||
|
||||||||||||||||||
let mut view = ByteView::from(array.views[1]); | ||||||||||||||||||
view.length = 15; | ||||||||||||||||||
let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]); | ||||||||||||||||||
Comment on lines
+951
to
+953
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dn't undertand what this is doing I think you can more easily create a stringview with multiple buffers like this: arrow-rs/arrow-cast/src/pretty.rs Lines 334 to 341 in 905c46b
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not familiar with the rest of the code, so I made it out brutally. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We assume the same idea, it's likely to be not compacted. |
||||||||||||||||||
array.views = new_views; | ||||||||||||||||||
|
||||||||||||||||||
let compacted = array.gc(); | ||||||||||||||||||
|
||||||||||||||||||
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); | ||||||||||||||||||
assert_eq!(array.value(0), compacted.value(0)); | ||||||||||||||||||
assert_eq!(array.value(1), compacted.value(1)); | ||||||||||||||||||
assert!(compacted.compact_check().iter().all(|x| *x)); | ||||||||||||||||||
|
||||||||||||||||||
// --------------------------------------------------------------------- | ||||||||||||||||||
// test compact on multiple buffers | ||||||||||||||||||
|
||||||||||||||||||
let mut array = { | ||||||||||||||||||
let mut builder = StringViewBuilder::new().with_block_size(15); | ||||||||||||||||||
builder.append_value("how to unfold your love"); | ||||||||||||||||||
builder.append_option(Some("I don't know how someone controlled you")); | ||||||||||||||||||
builder.finish() | ||||||||||||||||||
}; | ||||||||||||||||||
|
||||||||||||||||||
// verify it's not same buffer | ||||||||||||||||||
assert_eq!(array.buffers.len(), 2); | ||||||||||||||||||
// shrink the view | ||||||||||||||||||
|
||||||||||||||||||
let mut view = ByteView::from(array.views[1]); | ||||||||||||||||||
view.length = 15; | ||||||||||||||||||
let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]); | ||||||||||||||||||
array.views = new_views; | ||||||||||||||||||
|
||||||||||||||||||
let compacted = array.gc(); | ||||||||||||||||||
assert_eq!(compacted.buffers.len(), 2); | ||||||||||||||||||
assert_eq!(array.value(0), compacted.value(0)); | ||||||||||||||||||
assert_eq!(array.value(1), compacted.value(1)); | ||||||||||||||||||
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr()); | ||||||||||||||||||
assert_ne!(array.buffers[1].as_ptr(), compacted.buffers[1].as_ptr()); | ||||||||||||||||||
assert!(compacted.compact_check().iter().all(|x| *x)); | ||||||||||||||||||
} | ||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of buffers may shrink after
gc
. Every buffer should be filled up toblock_size
.See
GenericByteViewBuilder::append_value
.