Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ impl FileOpener for VortexOpener {
.await
.map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?;

// Check if there are rows in this file. If not, we can save
// ourselves some work and return an empty stream.
if vxf.row_count() == 0 {
return Ok(stream::empty().boxed());
}

// This is the expected arrow types of the actual columns in the file, which might have different types
// from the unified logical schema or miss
let this_file_schema = Arc::new(calculate_physical_schema(
Expand Down Expand Up @@ -430,6 +436,8 @@ fn apply_byte_range(
}

fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u64) -> Range<u64> {
debug_assert!(row_count > 0); // Asserted by an early exit check in VortexOpener::open

let average_row = total_size / row_count;
assert!(average_row > 0, "A row must always have at least one byte");

Expand Down Expand Up @@ -619,6 +627,33 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_open_empty_file() -> anyhow::Result<()> {
use futures::TryStreamExt;

let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let data_batch = record_batch!(("a", Int32, Vec::<i32>::new())).unwrap();
let file_path = "part=1/empty.vortex";
let file_size =
write_arrow_to_vortex(object_store.clone(), file_path, data_batch.clone()).await?;

let file_schema = data_batch.schema();
// Parallel scans may attach a byte range even for empty files; the
// opener must not call byte_range_to_row_range when the row_count is 0.
let file =
PartitionedFile::new_with_range(file_path.to_string(), file_size, 0, file_size as i64);

let table_schema = TableSchema::from_file_schema(file_schema.clone());

let opener = make_opener(object_store, table_schema, None);
let stream = opener.open(file)?.await?;
let data = stream.try_collect::<Vec<_>>().await?;

assert_eq!(data.len(), 0);

Ok(())
}

#[rstest]
#[tokio::test]
async fn test_open_files_different_table_schema() -> anyhow::Result<()> {
Expand Down
Loading