Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion encodings/zstd/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,4 @@ pub fn vortex_zstd::ZstdMetadata::clear(&mut self)

pub fn vortex_zstd::ZstdMetadata::encoded_len(&self) -> usize

pub fn vortex_zstd::reconstruct_views(buffer: &vortex_buffer::ByteBuffer) -> vortex_buffer::buffer::Buffer<vortex_array::arrays::varbinview::view::BinaryView>
pub fn vortex_zstd::reconstruct_views(buffer: &vortex_buffer::ByteBuffer, max_buffer_len: usize) -> (alloc::vec::Vec<vortex_buffer::ByteBuffer>, vortex_buffer::buffer::Buffer<vortex_array::arrays::varbinview::view::BinaryView>)
114 changes: 92 additions & 22 deletions encodings/zstd/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::varbinview::build_views::BinaryView;
use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::scalar::Scalar;
Expand Down Expand Up @@ -380,9 +381,20 @@ fn collect_valid_vbv(vbv: &VarBinViewArray) -> VortexResult<(ByteBuffer, Vec<usi
/// Reconstruct BinaryView structs from length-prefixed byte data.
///
/// The buffer contains interleaved u32 lengths (little-endian) and string data.
pub fn reconstruct_views(buffer: &ByteBuffer) -> Buffer<BinaryView> {
let mut res = BufferMut::<BinaryView>::empty();
/// When the cumulative data exceeds `max_buffer_len`, the buffer is split (zero-copy) into
/// multiple segments so that BinaryView's u32 offsets can address all data.
///
/// Pass [`MAX_BUFFER_LEN`] for `max_buffer_len` in production; a smaller value can be used in
/// tests to exercise the splitting path without allocating >2 GiB.
pub fn reconstruct_views(
buffer: &ByteBuffer,
max_buffer_len: usize,
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
let mut views = BufferMut::<BinaryView>::empty();
let mut buffers = Vec::new();
let mut segment_start: usize = 0;
let mut offset = 0;

while offset < buffer.len() {
let str_len = ViewLen::from_le_bytes(
buffer
Expand All @@ -392,16 +404,28 @@ pub fn reconstruct_views(buffer: &ByteBuffer) -> Buffer<BinaryView> {
.ok()
.vortex_expect("must fit ViewLen size"),
) as usize;
offset += size_of::<ViewLen>();
let value = &buffer[offset..offset + str_len];
res.push(BinaryView::make_view(
value,
0,
u32::try_from(offset).vortex_expect("offset must fit in u32"),
));
offset += str_len;
}
res.freeze()

let value_data_offset = offset + size_of::<ViewLen>();
let local_offset = value_data_offset - segment_start;

if local_offset + str_len > max_buffer_len && offset > segment_start {
buffers.push(buffer.slice(segment_start..offset));
segment_start = offset;
}

let local_offset = u32::try_from(value_data_offset - segment_start)
.vortex_expect("local offset within segment must fit in u32");
let buf_index = u32::try_from(buffers.len()).vortex_expect("buffer index must fit in u32");
let value = &buffer[value_data_offset..value_data_offset + str_len];
views.push(BinaryView::make_view(value, buf_index, local_offset));
offset = value_data_offset + str_len;
}

if segment_start < buffer.len() {
buffers.push(buffer.slice(segment_start..buffer.len()));
}

(buffers, views.freeze())
}

impl ZstdArray {
Expand Down Expand Up @@ -821,10 +845,8 @@ impl ZstdArray {
DType::Binary(_) | DType::Utf8(_) => {
match slice_validity.execute_mask(slice_n_rows, ctx)?.indices() {
AllOr::All => {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
// views into those strings by passing through the buffer.
let valid_views = reconstruct_views(&decompressed).slice(
let (buffers, all_views) = reconstruct_views(&decompressed, MAX_BUFFER_LEN);
let valid_views = all_views.slice(
slice_value_idx_start - n_skipped_values
..slice_value_idx_stop - n_skipped_values,
);
Expand All @@ -833,7 +855,7 @@ impl ZstdArray {
Ok(unsafe {
VarBinViewArray::new_unchecked(
valid_views,
Arc::from([decompressed]),
Arc::from(buffers),
self.dtype.clone(),
slice_validity,
)
Expand All @@ -846,10 +868,8 @@ impl ZstdArray {
)
.into_array()),
AllOr::Some(valid_indices) => {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
// views into those strings by passing through the buffer.
let valid_views = reconstruct_views(&decompressed).slice(
let (buffers, all_views) = reconstruct_views(&decompressed, MAX_BUFFER_LEN);
let valid_views = all_views.slice(
slice_value_idx_start - n_skipped_values
..slice_value_idx_stop - n_skipped_values,
);
Expand All @@ -863,7 +883,7 @@ impl ZstdArray {
Ok(unsafe {
VarBinViewArray::new_unchecked(
views.freeze(),
Arc::from([decompressed]),
Arc::from(buffers),
self.dtype.clone(),
slice_validity,
)
Expand Down Expand Up @@ -946,3 +966,53 @@ impl OperationsVTable<Zstd> for Zstd {
.scalar_at(0)
}
}

#[cfg(test)]
#[allow(clippy::cast_possible_truncation)]
mod tests {
use vortex_buffer::ByteBuffer;

use super::reconstruct_views;
use crate::array::BinaryView;

/// Build a Zstd-style interleaved buffer: [u32-LE length][string bytes] repeated.
fn make_interleaved(strings: &[&[u8]]) -> ByteBuffer {
let mut buf = Vec::new();
for s in strings {
let len = s.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(s);
}
ByteBuffer::copy_from(buf.as_slice())
}

#[test]
fn test_reconstruct_views_no_split() {
let strings: &[&[u8]] = &[b"hello", b"world"];
let buf = make_interleaved(strings);
let (buffers, views) = reconstruct_views(&buf, 1024);

assert_eq!(buffers.len(), 1);
assert_eq!(views.len(), 2);
// Each entry: [u32 len (4 bytes)][data], so offsets are 4 and 4+5+4=13
assert_eq!(views[0], BinaryView::make_view(b"hello", 0, 4));
assert_eq!(views[1], BinaryView::make_view(b"world", 0, 13));
}

#[test]
fn test_reconstruct_views_split_across_segments() {
// "aaaaaaaaaaaaa" (13 bytes) and "bbbbbbbbbbbbb" (13 bytes).
// Each entry occupies 4 (length prefix) + 13 (data) = 17 bytes.
// With max_buffer_len=20, the second entry's data (offset 4+13+4=21) exceeds the limit,
// so it rolls into a second segment.
let strings: &[&[u8]] = &[b"aaaaaaaaaaaaa", b"bbbbbbbbbbbbb"];
let buf = make_interleaved(strings);
let (buffers, views) = reconstruct_views(&buf, 20);

assert_eq!(buffers.len(), 2);
assert_eq!(views.len(), 2);
assert_eq!(views[0], BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 4));
// Second entry starts a new segment at byte 17 (the length prefix), so local offset = 4.
assert_eq!(views[1], BinaryView::make_view(b"bbbbbbbbbbbbb", 1, 4));
}
}
6 changes: 4 additions & 2 deletions vortex-cuda/src/kernel/encodings/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vortex::array::ArrayRef;
use vortex::array::Canonical;
use vortex::array::arrays::VarBinViewArray;
use vortex::array::arrays::varbinview::BinaryView;
use vortex::array::arrays::varbinview::build_views::MAX_BUFFER_LEN;
use vortex::array::buffer::BufferHandle;
use vortex::array::buffer::DeviceBuffer;
use vortex::buffer::Alignment;
Expand Down Expand Up @@ -325,13 +326,14 @@ async fn decode_zstd(array: ZstdArray, ctx: &mut CudaExecutionCtx) -> VortexResu
.indices()
{
AllOr::All => {
let all_views = vortex::encodings::zstd::reconstruct_views(&host_buffer);
let (buffers, all_views) =
vortex::encodings::zstd::reconstruct_views(&host_buffer, MAX_BUFFER_LEN);
let sliced_views = all_views.slice(slice_value_idx_start..slice_value_idx_stop);

Ok(Canonical::VarBinView(unsafe {
VarBinViewArray::new_unchecked(
sliced_views,
Arc::from([host_buffer]),
Arc::from(buffers),
dtype,
sliced_validity,
)
Expand Down
Loading