Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions vortex-test/compat-gen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ path = "src/validate_main.rs"

[dependencies]
# Vortex crates
vortex = { workspace = true, features = ["files", "tokio"] }
vortex = { workspace = true, features = ["files", "tokio", "zstd"] }
vortex-array = { workspace = true, features = ["_test-harness"] }
vortex-buffer = { workspace = true }
vortex-error = { workspace = true }
Expand All @@ -37,7 +37,9 @@ arrow-array = { workspace = true }
tpchgen = { workspace = true }
tpchgen-arrow = { workspace = true }

# ClickBench parquet reading
# ClickBench parquet reading + writing
arrow-select = { workspace = true }
bytes = { workspace = true }
parquet = { workspace = true }

# Async runtime
Expand All @@ -52,3 +54,4 @@ chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
62 changes: 61 additions & 1 deletion vortex-test/compat-gen/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use vortex::VortexSessionDefault;
use vortex::file::OpenOptionsSessionExt;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::session::RuntimeSessionExt;
use vortex::layout::LayoutStrategy;
use vortex::layout::layouts::flat::writer::FlatLayoutStrategy;
use vortex_array::ArrayRef;
use vortex_array::DynArray;
Expand All @@ -35,7 +36,7 @@ fn runtime() -> VortexResult<Runtime> {
pub fn write_file(path: &Path, chunk: ArrayRef) -> VortexResult<()> {
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));

let strategy: Arc<dyn vortex::layout::LayoutStrategy> = Arc::new(FlatLayoutStrategy::default());
let strategy: Arc<dyn LayoutStrategy> = Arc::new(FlatLayoutStrategy::default());

runtime()?.block_on(async {
let session = VortexSession::default().with_tokio();
Expand All @@ -51,6 +52,65 @@ pub fn write_file(path: &Path, chunk: ArrayRef) -> VortexResult<()> {
})
}

/// Write a sequence of array chunks to an in-memory `.vortex` byte buffer with no compression.
pub fn write_file_to_bytes(chunk: ArrayRef) -> VortexResult<ByteBuffer> {
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));

let strategy: Arc<dyn LayoutStrategy> = Arc::new(FlatLayoutStrategy::default());

runtime()?.block_on(async {
let session = VortexSession::default().with_tokio();
let mut bytes = Vec::new();
let _summary = session
.write_options()
.with_strategy(strategy)
.write(&mut bytes, stream)
.await?;
Ok(ByteBuffer::from(bytes))
})
}

/// Write a `.vortex` file using a caller-provided layout strategy (compressor pipeline).
pub fn write_compressed(
path: &Path,
chunk: ArrayRef,
strategy: Arc<dyn LayoutStrategy>,
) -> VortexResult<()> {
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));

runtime()?.block_on(async {
let session = VortexSession::default().with_tokio();
let mut file = tokio::fs::File::create(path)
.await
.map_err(|e| vortex_error::vortex_err!("failed to create {}: {e}", path.display()))?;
let _summary = session
.write_options()
.with_strategy(strategy)
.write(&mut file, stream)
.await?;
Ok(())
})
}

/// Write a `.vortex` file into an in-memory byte buffer using a caller-provided layout strategy.
pub fn write_compressed_to_bytes(
chunk: ArrayRef,
strategy: Arc<dyn LayoutStrategy>,
) -> VortexResult<ByteBuffer> {
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));

runtime()?.block_on(async {
let session = VortexSession::default().with_tokio();
let mut bytes = Vec::new();
let _summary = session
.write_options()
.with_strategy(strategy)
.write(&mut bytes, stream)
.await?;
Ok(ByteBuffer::from(bytes))
})
}

/// Read a `.vortex` file from bytes, returning the arrays.
pub fn read_file(bytes: ByteBuffer) -> VortexResult<ArrayRef> {
runtime()?.block_on(async {
Expand Down
184 changes: 163 additions & 21 deletions vortex-test/compat-gen/src/fixtures/arrays/datasets/clickbench.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,192 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::fs;
use std::path::PathBuf;

use arrow_array::RecordBatch;
use bytes::Bytes;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::arrays::ChunkedArray;
use vortex_array::arrays::Primitive;
use vortex_array::arrays::Struct;
use vortex_array::arrays::VarBin;
use vortex_array::arrow::FromArrowArray;
use vortex_array::vtable::ArrayId;
use vortex_error::VortexResult;
use vortex_error::vortex_err;

use crate::fixtures::ArrayFixture;
use crate::fixtures::DatasetFixture;

/// First partition of ClickBench hits, limited to 1000 rows.
// TODO: Upload the pre-sampled 5k parquet to R2 and download it in a build.rs instead of
// downloading the full ~112MB partition 0 at runtime.
const CLICKBENCH_URL: &str =
"https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_0.parquet";

struct ClickBenchHits1kFixture;
/// Deterministic offsets (seed=42) into clickbench hits partition 0.
const SAMPLE_OFFSETS: [usize; 5] = [26225, 116739, 288389, 670487, 777572];
const ROWS_PER_OFFSET: usize = 1000;

impl ArrayFixture for ClickBenchHits1kFixture {
fn name(&self) -> &str {
"clickbench_hits_1k.vortex"
const MAX_RETRIES: u32 = 3;

/// Returns the path to `data/clickbench_hits_5k.parquet` relative to the crate root,
/// downloading and sampling from the full dataset if it doesn't already exist.
fn cached_clickbench_parquet() -> VortexResult<PathBuf> {
let crate_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let data_dir = crate_dir.join("data");
let dest = data_dir.join("clickbench_hits_5k.parquet");

if dest.exists() {
return Ok(dest);
}

fn description(&self) -> &str {
"First 1000 rows of ClickBench hits dataset with wide schema of primitives and strings"
fs::create_dir_all(&data_dir).map_err(|e| vortex_err!("failed to create data dir: {e}"))?;

// Download full partition 0 to a temp file.
let source_bytes = download_with_retries(CLICKBENCH_URL)?;

// Sample 5k rows and write to dest.
sample_and_write(&source_bytes, &dest)?;

Ok(dest)
}

fn download_with_retries(url: &str) -> VortexResult<Bytes> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a future follow up: there's rust crates that could wrap the retry logic

let client = reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(300))
.build()
.map_err(|e| vortex_err!("failed to build HTTP client: {e}"))?;

for attempt in 1..=MAX_RETRIES {
match client.get(url).send() {
Ok(response) if response.status().is_success() => {
return response
.bytes()
.map_err(|e| vortex_err!("failed to read response body: {e}"));
}
Ok(response) if response.status().is_client_error() => {
return Err(vortex_err!(
"HTTP {}: failed to download {url}",
response.status()
));
}
Ok(response) => {
eprintln!(
"Download attempt {attempt}/{MAX_RETRIES} failed: HTTP {} for {url}",
response.status()
);
}
Err(e) => {
eprintln!("Download attempt {attempt}/{MAX_RETRIES} failed: {e}");
}
}

if attempt < MAX_RETRIES {
let delay = std::time::Duration::from_secs(2u64.pow(attempt));
std::thread::sleep(delay);
}
}

Err(vortex_err!(
"failed to download {url} after {MAX_RETRIES} attempts"
))
}

#[allow(clippy::cast_possible_truncation)]
fn sample_and_write(source_bytes: &[u8], dest: &std::path::Path) -> VortexResult<()> {
let source_bytes = Bytes::copy_from_slice(source_bytes);
let builder = ParquetRecordBatchReaderBuilder::try_new(source_bytes.clone())
.map_err(|e| vortex_err!("failed to open source parquet: {e}"))?;
let metadata = builder.metadata().clone();

let total_rows: usize = metadata
.row_groups()
.iter()
.map(|rg| rg.num_rows() as usize)
.sum();

// Build (row_group_index, local_offset, count) ranges for each sample window.
let mut ranges: Vec<(usize, usize, usize)> = Vec::new();
for &offset in &SAMPLE_OFFSETS {
let end = (offset + ROWS_PER_OFFSET).min(total_rows);
let mut remaining = end - offset;
let mut global_pos = 0usize;

for (rg_idx, rg_meta) in metadata.row_groups().iter().enumerate() {
let rg_rows = rg_meta.num_rows() as usize;
let rg_end = global_pos + rg_rows;

if offset < rg_end && global_pos < end {
let local_start = offset.saturating_sub(global_pos);
let local_end = (local_start + remaining).min(rg_rows);
let count = local_end - local_start;
if count > 0 {
ranges.push((rg_idx, local_start, count));
remaining -= count;
}
}
global_pos = rg_end;
if remaining == 0 {
break;
}
}
}

fn expected_encodings(&self) -> Vec<ArrayId> {
vec![Struct::ID, Primitive::ID, VarBin::ID]
// Read each range and collect batches.
let mut sampled_batches: Vec<RecordBatch> = Vec::new();
for &(rg_idx, local_offset, count) in &ranges {
let reader = ParquetRecordBatchReaderBuilder::try_new(source_bytes.clone())
.map_err(|e| vortex_err!("failed to open parquet for sampling: {e}"))?
.with_row_groups(vec![rg_idx])
.with_offset(local_offset)
.with_limit(count)
.with_batch_size(count)
.build()
.map_err(|e| vortex_err!("failed to build parquet reader: {e}"))?;

for batch in reader {
sampled_batches
.push(batch.map_err(|e| vortex_err!("failed to read parquet batch: {e}"))?);
}
}

// Write sampled batches to a parquet file.
let schema = sampled_batches[0].schema();
let combined = arrow_select::concat::concat_batches(&schema, &sampled_batches)
.map_err(|e| vortex_err!("failed to concat batches: {e}"))?;

let file =
fs::File::create(dest).map_err(|e| vortex_err!("failed to create output parquet: {e}"))?;
let mut writer = parquet::arrow::ArrowWriter::try_new(file, schema, None)
.map_err(|e| vortex_err!("failed to create parquet writer: {e}"))?;
writer
.write(&combined)
.map_err(|e| vortex_err!("failed to write parquet: {e}"))?;
writer
.close()
.map_err(|e| vortex_err!("failed to close parquet writer: {e}"))?;

Ok(())
}

struct ClickBenchHits5kFixture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're testing: old writer → current reader vs current writer → current reader but not old writer → old reader vs current writer → current reader right?


impl DatasetFixture for ClickBenchHits5kFixture {
fn name(&self) -> &str {
"clickbench_hits_5k"
}

fn description(&self) -> &str {
"5000 rows (5x1000 from random offsets) of ClickBench hits dataset with wide schema of primitives and strings"
}

fn build(&self) -> VortexResult<ArrayRef> {
let bytes = reqwest::blocking::get(CLICKBENCH_URL)
.map_err(|e| vortex_err!("failed to download ClickBench parquet: {e}"))?
.bytes()
.map_err(|e| vortex_err!("failed to read ClickBench response body: {e}"))?;
let path = cached_clickbench_parquet()?;
let file_bytes = fs::read(&path)
.map_err(|e| vortex_err!("failed to read cached parquet at {}: {e}", path.display()))?;
let bytes = Bytes::from(file_bytes);

let reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
.map_err(|e| vortex_err!("failed to open parquet: {e}"))?
.with_batch_size(1000)
.with_limit(1000)
.build()
.map_err(|e| vortex_err!("failed to build parquet reader: {e}"))?;

Expand All @@ -62,6 +204,6 @@ impl ArrayFixture for ClickBenchHits1kFixture {
}
}

pub fn fixtures() -> Vec<Box<dyn ArrayFixture>> {
vec![Box::new(ClickBenchHits1kFixture)]
pub fn fixtures() -> Vec<Box<dyn DatasetFixture>> {
vec![Box::new(ClickBenchHits5kFixture)]
}
41 changes: 39 additions & 2 deletions vortex-test/compat-gen/src/fixtures/arrays/datasets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,49 @@ mod clickbench;
#[allow(clippy::cast_possible_truncation)]
mod tpch;

use crate::fixtures::ArrayFixture;
use crate::fixtures::DatasetFixture;

/// All dataset-derived fixtures.
pub fn fixtures() -> Vec<Box<dyn ArrayFixture>> {
pub fn fixtures() -> Vec<Box<dyn DatasetFixture>> {
let mut fixtures = Vec::new();
fixtures.extend(tpch::fixtures());
fixtures.extend(clickbench::fixtures());
fixtures
}

#[cfg(test)]
mod tests {
use vortex::file::WriteStrategyBuilder;

use super::fixtures;
use crate::adapter;

fn is_clickbench_fixture(name: &str) -> bool {
name.contains("clickbench")
}

#[test]
fn roundtrip_non_clickbench_fixtures_to_bytes() {
for dataset in fixtures()
.into_iter()
.filter(|fixture| !is_clickbench_fixture(fixture.name()))
{
let array = dataset.build().unwrap();
let regular_bytes = adapter::write_compressed_to_bytes(
array.clone(),
WriteStrategyBuilder::default().build(),
)
.unwrap();
let _regular = adapter::read_file(regular_bytes).unwrap();

let compact_bytes = adapter::write_compressed_to_bytes(
array,
WriteStrategyBuilder::default()
.with_compact_encodings()
.build(),
)
.unwrap();
let _compact = adapter::read_file(compact_bytes).unwrap();
}
}
}
Loading
Loading