Skip to content

Commit 9072c20

Browse files
committed
feat: add gc to view-arrays
part2: implement actual gc algorithm Signed-off-by: 蔡略 <[email protected]>
1 parent 25197a6 commit 9072c20

File tree

1 file changed

+133
-13
lines changed

1 file changed

+133
-13
lines changed

arrow-array/src/array/byte_view_array.rs

Lines changed: 133 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
2525
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
2626
use arrow_schema::{ArrowError, DataType};
2727
use std::any::Any;
28-
use std::collections::{BTreeMap, BTreeSet};
28+
use std::collections::BTreeMap;
2929
use std::fmt::Debug;
3030
use std::marker::PhantomData;
3131
use std::sync::Arc;
@@ -267,16 +267,32 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
267267
}
268268
}
269269

270-
/// Returns whether this array is a compact view
271-
pub(self) fn is_compact_view(&self) -> bool {
272-
todo!()
270+
/// Returns whether the buffers are compact
271+
pub(self) fn compact_check(&self) -> Vec<bool> {
272+
let mut checkers: Vec<_> = self
273+
.buffers
274+
.iter()
275+
.map(|b| CompactChecker::new(b.len()))
276+
.collect();
277+
278+
for (i, view) in self.views.iter().enumerate() {
279+
let view = ByteView::from(*view);
280+
if self.is_null(i) || view.length <= 12 {
281+
continue;
282+
}
283+
checkers[view.buffer_index as usize]
284+
.accumulate(view.offset as usize, view.length as usize);
285+
}
286+
checkers.into_iter().map(|c| c.finish()).collect()
273287
}
274288

275-
/// Returns a compact version of this array
289+
/// Returns a buffer compact version of this array
290+
///
291+
/// The original array will *not* be modified
276292
///
277-
/// # Compaction
293+
/// # Garbage Collection
278294
///
279-
/// before compaction:
295+
/// Before GC:
280296
/// ```text
281297
/// ┌──────┐
282298
/// │......│
@@ -292,7 +308,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
292308
/// large buffer
293309
/// ```
294310
///
295-
/// after compaction:
311+
/// After GC:
296312
///
297313
/// ```text
298314
/// ┌────────────────────┐ ┌─────┐ After gc, only
@@ -304,12 +320,60 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
304320
///
305321
/// 2 views
306322
/// ```
307-
/// this method will compact the data buffers to only include the data
308-
/// that is pointed to by the views
323+
/// This method will compact the data buffers to only include the data
324+
/// that is pointed to by the views,
309325
/// and return a new array with the compacted data buffers.
310-
/// the original array will be left as is.
311-
pub fn compact(&self) -> Self {
312-
todo!()
326+
/// The original array will be left as is.
327+
pub fn gc(&self) -> Self {
328+
let compact_check = self.compact_check();
329+
330+
if compact_check.iter().all(|x| *x) {
331+
return self.clone();
332+
}
333+
334+
let mut new_views = Vec::with_capacity(self.views.len());
335+
let mut new_bufs: Vec<Vec<u8>> = vec![vec![]; self.buffers.len()];
336+
for view in self.views.iter() {
337+
let mut bv = ByteView::from(*view);
338+
let idx = bv.buffer_index as usize;
339+
if bv.length <= 12 || compact_check[idx] {
340+
new_views.push(*view);
341+
continue;
342+
}
343+
// copy data to new buffer
344+
let data = self.buffers.get(idx).unwrap();
345+
let offset = new_bufs[idx].len();
346+
let len = bv.length as usize;
347+
new_bufs[idx].extend_from_slice(
348+
data.get(bv.offset as usize..bv.offset as usize + len)
349+
.unwrap(),
350+
);
351+
// update view
352+
bv.offset = offset as u32;
353+
354+
new_views.push(bv.into());
355+
}
356+
let new_bufs: Vec<_> = new_bufs.into_iter().map(Buffer::from_vec).collect();
357+
358+
let new_views = ScalarBuffer::from(new_views);
359+
360+
let new_buffers = self
361+
.buffers
362+
.iter()
363+
.enumerate()
364+
.map(|(idx, buf)| {
365+
if compact_check[idx] {
366+
buf.clone()
367+
} else {
368+
new_bufs[idx].clone()
369+
}
370+
})
371+
.collect();
372+
373+
let mut compacted = self.clone();
374+
compacted.buffers = new_buffers;
375+
compacted.views = new_views;
376+
compacted
313377
}
314378
}
315379

@@ -805,4 +869,60 @@ mod tests {
805869
checker.accumulate(5, 5);
806870
assert!(checker.finish());
807871
}
872+
873+
#[test]
874+
fn test_gc() {
875+
// test compact on compacted data
876+
let array = {
877+
let mut builder = StringViewBuilder::new();
878+
builder.append_value("I look at you all");
879+
builder.append_option(Some("see the love there that's sleeping"));
880+
builder.finish()
881+
};
882+
883+
let compacted = array.gc();
884+
// verify it is a shallow copy
885+
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
886+
887+
// test compact on non-compacted data
888+
let mut array = {
889+
let mut builder = StringViewBuilder::new();
890+
builder.append_value("while my guitar gently weeps");
891+
builder.finish()
892+
};
893+
894+
// shrink the view
895+
let mut view = ByteView::from(array.views[0]);
896+
view.length = 15;
897+
let new_views = ScalarBuffer::from(vec![view.into()]);
898+
array.views = new_views;
899+
900+
let compacted = array.gc();
901+
// verify it is a deep copy
902+
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
903+
// verify content
904+
assert_eq!(array.value(0), compacted.value(0));
905+
// verify compacted
906+
assert!(compacted.compact_check().iter().all(|x| *x));
907+
908+
// test compact on array containing null
909+
let mut array = {
910+
let mut builder = StringViewBuilder::new();
911+
builder.append_null();
912+
builder.append_option(Some("I don't know why nobody told you"));
913+
builder.finish()
914+
};
915+
916+
let mut view = ByteView::from(array.views[1]);
917+
view.length = 15;
918+
let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]);
919+
array.views = new_views;
920+
921+
let compacted = array.gc();
922+
923+
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
924+
assert_eq!(array.value(0), compacted.value(0));
925+
assert_eq!(array.value(1), compacted.value(1));
926+
assert!(compacted.compact_check().iter().all(|x| *x));
927+
}
808928
}

0 commit comments

Comments
 (0)