Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions encodings/fsst/benches/fsst_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ const BENCH_ARGS: &[(usize, usize, u8)] = &[
(1_000, 16, 8),
(1_000, 64, 4),
(1_000, 64, 8),
(1_000, 256, 4),
(1_000, 256, 8),
(10_000, 4, 4),
(10_000, 4, 8),
(10_000, 16, 4),
(10_000, 16, 8),
(10_000, 64, 4),
(10_000, 64, 8),
(10_000, 256, 4),
(10_000, 256, 8),
];

#[divan::bench(args = BENCH_ARGS)]
Expand Down
31 changes: 13 additions & 18 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,24 @@ pub(crate) fn fsst_decode_views(
.clone()
.execute::<PrimitiveArray>(ctx)?;

#[allow(clippy::cast_possible_truncation)]
let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
uncompressed_lens_array
.as_slice::<P>()
.iter()
.map(|x| *x as usize)
.sum()
});

// Bulk-decompress the entire array.
let decompressor = fsst_array.decompressor();
let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
let len =
decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
unsafe { uncompressed_bytes.set_len(len) };

// Directly create the binary views.
// Single dispatch: sum lengths, decompress, and build views in one match arm.
match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
let lens = uncompressed_lens_array.as_slice::<P>();
#[allow(clippy::cast_possible_truncation)]
let total_size: usize = lens.iter().map(|x| *x as usize).sum();

// Bulk-decompress the entire array.
let decompressor = fsst_array.decompressor();
let mut uncompressed_bytes = ByteBufferMut::with_capacity(total_size + 7);
let len =
decompressor.decompress_into(bytes.as_slice(), uncompressed_bytes.spare_capacity_mut());
unsafe { uncompressed_bytes.set_len(len) };

Ok(build_views(
start_buf_index,
MAX_BUFFER_LEN,
uncompressed_bytes,
uncompressed_lens_array.as_slice::<P>(),
lens,
))
})
}
Expand Down
84 changes: 59 additions & 25 deletions vortex-array/src/arrays/varbinview/build_views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ pub fn offsets_to_lengths<P: NativePType>(offsets: &[P]) -> Buffer<P> {
/// Maximum number of buffer bytes that can be referenced by a single `BinaryView`
pub const MAX_BUFFER_LEN: usize = i32::MAX as usize;

/// Split a large buffer of input `bytes` holding string data
/// Build `BinaryView`s from a contiguous byte buffer and per-element lengths.
///
/// When total data exceeds `max_buffer_len` (2 GiB), buffers are split to ensure
/// offsets fit in `u32`. When data fits in a single buffer, per-element split checks
/// are skipped entirely.
#[allow(clippy::cast_possible_truncation)]
pub fn build_views<P: NativePType + AsPrimitive<usize>>(
start_buf_index: u32,
max_buffer_len: usize,
Expand All @@ -33,35 +38,64 @@ pub fn build_views<P: NativePType + AsPrimitive<usize>>(
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());

let mut buffers = Vec::new();
let mut buf_index = start_buf_index;

let mut offset = 0;
for &len in lens {
let len = len.as_();
assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");

if (offset + len) > max_buffer_len {
// Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field
let rest = bytes.split_off(offset);
if bytes.len() <= max_buffer_len {
// Fast path: all data fits in a single buffer. No split checks needed per element
// and offsets are guaranteed to fit in u32 (max_buffer_len <= i32::MAX < u32::MAX).
let bytes_ptr = bytes.as_slice().as_ptr();

// Write views directly via pointer to avoid per-element push_unchecked overhead
// (spare-capacity lookup + length bookkeeping on each iteration).
let views_dst = views.spare_capacity_mut().as_mut_ptr() as *mut BinaryView;

let mut offset: usize = 0;
for (i, &len) in lens.iter().enumerate() {
let len: usize = len.as_();
// SAFETY: the sum of all lengths equals bytes.len() and we process them
// sequentially, so offset + len <= bytes.len() at every iteration.
let value = unsafe { std::slice::from_raw_parts(bytes_ptr.add(offset), len) };
let view = BinaryView::make_view(value, start_buf_index, offset as u32);
// SAFETY: we reserved capacity for lens.len() views and i < lens.len().
unsafe { views_dst.add(i).write(view) };
offset += len;
}
// SAFETY: we wrote exactly lens.len() views above.
unsafe { views.set_len(lens.len()) };

let buffers = if bytes.is_empty() {
Vec::new()
} else {
vec![bytes.freeze()]
};

(buffers, views.freeze())
} else {
// Slow path: may need to split across multiple 2 GiB buffers.
let mut buffers = Vec::new();
let mut buf_index = start_buf_index;
let mut offset = 0;

for &len in lens {
let len = len.as_();
assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");

if (offset + len) > max_buffer_len {
let rest = bytes.split_off(offset);
buffers.push(bytes.freeze());
buf_index += 1;
offset = 0;
bytes = rest;
}
let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
unsafe { views.push_unchecked(view) };
offset += len;
}

if !bytes.is_empty() {
buffers.push(bytes.freeze());
buf_index += 1;
offset = 0;

bytes = rest;
}
let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
// SAFETY: we reserved the right capacity beforehand
unsafe { views.push_unchecked(view) };
offset += len;
}

if !bytes.is_empty() {
buffers.push(bytes.freeze());
(buffers, views.freeze())
}

(buffers, views.freeze())
}

#[cfg(test)]
Expand Down
85 changes: 20 additions & 65 deletions vortex-array/src/arrays/varbinview/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::ops::Range;

use static_assertions::assert_eq_align;
use static_assertions::assert_eq_size;
use vortex_error::VortexExpect;

/// A view over a variable-length binary value.
///
Expand Down Expand Up @@ -46,18 +45,6 @@ pub struct Inlined {
}

impl Inlined {
/// Creates a new inlined representation from the provided value of constant size.
#[inline]
fn new<const N: usize>(value: &[u8]) -> Self {
debug_assert_eq!(value.len(), N);
let mut inlined = Self {
size: N.try_into().vortex_expect("inlined size must fit in u32"),
data: [0u8; BinaryView::MAX_INLINED_SIZE],
};
inlined.data[..N].copy_from_slice(&value[..N]);
inlined
}

/// Returns the full inlined value.
#[inline]
pub fn value(&self) -> &[u8] {
Expand Down Expand Up @@ -102,66 +89,34 @@ impl BinaryView {
/// Maximum size of an inlined binary value.
pub const MAX_INLINED_SIZE: usize = 12;

/// Create a view from a value, block and offset
/// Create a view from a value, block and offset.
///
/// Depending on the length of the provided value either a new inlined
/// or a reference view will be constructed.
///
/// Adapted from arrow-rs <https://github.com/apache/arrow-rs/blob/f4fde769ab6e1a9b75f890b7f8b47bc22800830b/arrow-array/src/builder/generic_bytes_view_builder.rs#L524>
/// Explicitly enumerating inlined view produces code that avoids calling generic `ptr::copy_non_interleave` that's slower than explicit stores
#[inline(never)]
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn make_view(value: &[u8], block: u32, offset: u32) -> Self {
match value.len() {
0 => Self {
inlined: Inlined::new::<0>(value),
},
1 => Self {
inlined: Inlined::new::<1>(value),
},
2 => Self {
inlined: Inlined::new::<2>(value),
},
3 => Self {
inlined: Inlined::new::<3>(value),
},
4 => Self {
inlined: Inlined::new::<4>(value),
},
5 => Self {
inlined: Inlined::new::<5>(value),
},
6 => Self {
inlined: Inlined::new::<6>(value),
},
7 => Self {
inlined: Inlined::new::<7>(value),
},
8 => Self {
inlined: Inlined::new::<8>(value),
},
9 => Self {
inlined: Inlined::new::<9>(value),
},
10 => Self {
inlined: Inlined::new::<10>(value),
},
11 => Self {
inlined: Inlined::new::<11>(value),
},
12 => Self {
inlined: Inlined::new::<12>(value),
},
_ => Self {
let len = value.len();
if len <= Self::MAX_INLINED_SIZE {
// Inlined: zero-initialize, write size, then copy value bytes.
let mut view = Self {
le_bytes: [0u8; 16],
};
unsafe {
view.inlined.size = len as u32;
std::ptr::copy_nonoverlapping(value.as_ptr(), view.inlined.data.as_mut_ptr(), len);
}
view
} else {
Self {
_ref: Ref {
size: u32::try_from(value.len()).vortex_expect("value length must fit in u32"),
prefix: value[0..4]
.try_into()
.ok()
.vortex_expect("prefix must be exactly 4 bytes"),
size: len as u32,
// SAFETY: len >= 13, so reading 4 bytes from the start is always valid.
prefix: unsafe { (value.as_ptr() as *const [u8; 4]).read_unaligned() },
buffer_index: block,
offset,
},
},
}
}
}

Expand Down
Loading