Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use vortex::layout::LayoutReader;
use vortex::metrics::Label;
use vortex::metrics::MetricsRegistry;
use vortex::scan::ScanBuilder;
use vortex::scan::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT;
use vortex::scan::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT;
use vortex::session::VortexSession;
use vortex_utils::aliases::dash_map::DashMap;
use vortex_utils::aliases::dash_map::Entry;
Expand Down Expand Up @@ -361,6 +363,8 @@ impl FileOpener for VortexOpener {
.with_projection(scan_projection)
.with_some_filter(filter)
.with_ordered(has_output_ordering)
.with_target_output_rows(DEFAULT_TARGET_OUTPUT_ROWS_HINT)
.with_target_output_bytes(DEFAULT_TARGET_OUTPUT_BYTES_HINT)
.map(move |chunk| {
let mut ctx = session.create_execution_ctx();
chunk.execute_record_batch(&stream_schema, &mut ctx)
Expand Down
8 changes: 8 additions & 0 deletions vortex-layout/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,8 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&sel

pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::row_count(&self) -> u64

pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::split_points(&self, field_mask: alloc::vec::Vec<vortex_array::dtype::field_mask::FieldMask>, row_range: core::ops::range::Range<u64>) -> vortex_error::VortexResult<vortex_layout::SplitPointIter>

pub fn vortex_layout::layouts::row_idx::row_idx() -> vortex_array::expr::expression::Expression

pub mod vortex_layout::layouts::struct_
Expand Down Expand Up @@ -1674,6 +1676,8 @@ pub fn vortex_layout::LayoutReader::register_splits(&self, field_mask: &[vortex_

pub fn vortex_layout::LayoutReader::row_count(&self) -> u64

pub fn vortex_layout::LayoutReader::split_points(&self, field_mask: alloc::vec::Vec<vortex_array::dtype::field_mask::FieldMask>, row_range: core::ops::range::Range<u64>) -> vortex_error::VortexResult<vortex_layout::SplitPointIter>

impl vortex_layout::LayoutReader for vortex_layout::layouts::row_idx::RowIdxLayoutReader

pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::dtype(&self) -> &vortex_array::dtype::DType
Expand All @@ -1690,6 +1694,8 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&sel

pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::row_count(&self) -> u64

pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::split_points(&self, field_mask: alloc::vec::Vec<vortex_array::dtype::field_mask::FieldMask>, row_range: core::ops::range::Range<u64>) -> vortex_error::VortexResult<vortex_layout::SplitPointIter>

pub trait vortex_layout::LayoutStrategy: 'static + core::marker::Send + core::marker::Sync

pub fn vortex_layout::LayoutStrategy::buffered_bytes(&self) -> u64
Expand Down Expand Up @@ -1969,3 +1975,5 @@ pub type vortex_layout::LayoutId = arcref::ArcRef<str>
pub type vortex_layout::LayoutReaderRef = alloc::sync::Arc<dyn vortex_layout::LayoutReader>

pub type vortex_layout::LayoutRef = alloc::sync::Arc<dyn vortex_layout::Layout>

pub type vortex_layout::SplitPointIter = alloc::boxed::Box<(dyn core::iter::traits::iterator::Iterator<Item = u64> + core::marker::Send)>
38 changes: 38 additions & 0 deletions vortex-layout/src/layouts/chunked/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use vortex_session::VortexSession;

use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::SplitPointIter;
use crate::concat_split_point_iters;
use crate::layouts::chunked::ChunkedLayout;
use crate::reader::LayoutReader;
use crate::segments::SegmentSource;
Expand Down Expand Up @@ -200,6 +202,28 @@ impl LayoutReader for ChunkedReader {
Ok(())
}

fn split_points(
&self,
field_mask: Vec<FieldMask>,
row_range: Range<u64>,
) -> VortexResult<SplitPointIter> {
if row_range.is_empty() {
return Ok(Box::new(std::iter::empty()));
}

let mut iters = Vec::new();
for (chunk_idx, chunk_range, _) in self.ranges(&row_range) {
let child = self.chunk_reader(chunk_idx)?.clone();
let chunk_offset = self.chunk_offset(chunk_idx);
let split_points = child.split_points(field_mask.clone(), chunk_range)?;
iters.push(
Box::new(split_points.map(move |point| point + chunk_offset)) as SplitPointIter,
);
}

Ok(concat_split_point_iters(iters))
}

fn pruning_evaluation(
&self,
row_range: &Range<u64>,
Expand Down Expand Up @@ -338,6 +362,7 @@ mod test {
use vortex_array::MaskFuture;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::DType;
use vortex_array::dtype::FieldMask;
use vortex_array::dtype::Nullability::NonNullable;
use vortex_array::dtype::PType;
use vortex_array::expr::root;
Expand Down Expand Up @@ -406,4 +431,17 @@ mod test {
assert_arrays_eq!(result.as_ref(), expected.as_ref());
})
}

#[rstest]
fn test_chunked_split_points_are_absolute(
#[from(chunked_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
) {
let reader = layout.new_reader("".into(), segments, &SESSION).unwrap();
let split_points = reader
.split_points(vec![FieldMask::All], 2..8)
.unwrap()
.collect::<Vec<_>>();

assert_eq!(split_points, vec![3, 6, 8]);
}
}
9 changes: 9 additions & 0 deletions vortex-layout/src/layouts/dict/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use vortex_utils::aliases::dash_map::DashMap;
use super::DictLayout;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::SplitPointIter;
use crate::layouts::SharedArrayFuture;
use crate::segments::SegmentSource;

Expand Down Expand Up @@ -154,6 +155,14 @@ impl LayoutReader for DictReader {
self.codes.register_splits(field_mask, row_range, splits)
}

fn split_points(
&self,
field_mask: Vec<FieldMask>,
row_range: Range<u64>,
) -> VortexResult<SplitPointIter> {
self.codes.split_points(field_mask, row_range)
}

fn pruning_evaluation(
&self,
_row_range: &Range<u64>,
Expand Down
69 changes: 42 additions & 27 deletions vortex-layout/src/layouts/flat/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::BTreeSet;
use std::ops::BitAnd;
use std::ops::Range;
use std::sync::Arc;
use std::sync::OnceLock;

use futures::FutureExt;
use futures::future::BoxFuture;
Expand All @@ -22,6 +23,7 @@ use vortex_mask::Mask;
use vortex_session::VortexSession;

use crate::LayoutReader;
use crate::SplitPointIter;
use crate::layouts::SharedArrayFuture;
use crate::layouts::flat::FlatLayout;
use crate::segments::SegmentSource;
Expand All @@ -38,6 +40,7 @@ pub struct FlatReader {
name: Arc<str>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
array: OnceLock<SharedArrayFuture>,
}

impl FlatReader {
Expand All @@ -52,38 +55,38 @@ impl FlatReader {
name,
segment_source,
session,
array: Default::default(),
}
}

/// Register the segment request and return a future that would resolve into the deserialised array.
fn array_future(&self) -> SharedArrayFuture {
let row_count =
usize::try_from(self.layout.row_count()).vortex_expect("row count must fit in usize");

// We create the segment_fut here to ensure we give the segment reader visibility into
// how to prioritize this segment, even if the `array` future has already been initialized.
// This is gross... see the function's TODO for a maybe better solution?
let segment_fut = self.segment_source.request(self.layout.segment_id());

let ctx = self.layout.array_ctx().clone();
let session = self.session.clone();
let dtype = self.layout.dtype().clone();
let array_tree = self.layout.array_tree().cloned();
async move {
let segment = segment_fut.await?;
let parts = if let Some(array_tree) = array_tree {
// Use the pre-stored flatbuffer from layout metadata combined with segment buffers.
ArrayParts::from_flatbuffer_and_segment(array_tree, segment)?
} else {
// Parse the flatbuffer from the segment itself.
ArrayParts::try_from(segment)?
};
parts
.decode(&dtype, row_count, &ctx, &session)
.map_err(Arc::new)
}
.boxed()
.shared()
self.array
.get_or_init(|| {
let row_count = usize::try_from(self.layout.row_count())
.vortex_expect("row count must fit in usize");
let segment_fut = self.segment_source.request(self.layout.segment_id());
let ctx = self.layout.array_ctx().clone();
let session = self.session.clone();
let dtype = self.layout.dtype().clone();
let array_tree = self.layout.array_tree().cloned();
async move {
let segment = segment_fut.await?;
let parts = if let Some(array_tree) = array_tree {
// Use the pre-stored flatbuffer from layout metadata combined with segment buffers.
ArrayParts::from_flatbuffer_and_segment(array_tree, segment)?
} else {
// Parse the flatbuffer from the segment itself.
ArrayParts::try_from(segment)?
};
parts
.decode(&dtype, row_count, &ctx, &session)
.map_err(Arc::new)
}
.boxed()
.shared()
})
.clone()
}
}

Expand All @@ -110,6 +113,18 @@ impl LayoutReader for FlatReader {
Ok(())
}

fn split_points(
&self,
_field_mask: Vec<FieldMask>,
row_range: Range<u64>,
) -> VortexResult<SplitPointIter> {
if row_range.is_empty() {
return Ok(Box::new(std::iter::empty()));
}

Ok(Box::new(std::iter::once(row_range.end)))
}

fn pruning_evaluation(
&self,
_row_range: &Range<u64>,
Expand Down
9 changes: 9 additions & 0 deletions vortex-layout/src/layouts/row_idx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use vortex_utils::aliases::dash_map::DashMap;

use crate::ArrayFuture;
use crate::LayoutReader;
use crate::SplitPointIter;
use crate::layouts::partitioned::PartitionedExprEval;

pub struct RowIdxLayoutReader {
Expand Down Expand Up @@ -175,6 +176,14 @@ impl LayoutReader for RowIdxLayoutReader {
self.child.register_splits(field_mask, row_range, splits)
}

fn split_points(
&self,
field_mask: Vec<FieldMask>,
row_range: Range<u64>,
) -> VortexResult<SplitPointIter> {
self.child.split_points(field_mask, row_range)
}

fn pruning_evaluation(
&self,
row_range: &Range<u64>,
Expand Down
67 changes: 67 additions & 0 deletions vortex-layout/src/layouts/struct_/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ use crate::ArrayFuture;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::SplitPointIter;
use crate::layouts::partitioned::PartitionedExprEval;
use crate::layouts::struct_::StructLayout;
use crate::merge_split_point_iters;
use crate::segments::SegmentSource;

pub struct StructReader {
Expand Down Expand Up @@ -151,6 +153,49 @@ impl StructReader {
.transpose()
}

fn split_children(
&self,
field_mask: Vec<FieldMask>,
) -> VortexResult<Vec<(LayoutReaderRef, Vec<FieldMask>)>> {
let mut children = Vec::new();

if let Some(validity) = self.validity()? {
children.push((validity.clone(), vec![FieldMask::All]));
}

if field_mask.iter().any(FieldMask::matches_all) {
for idx in 0..self.struct_fields().nfields() {
children.push((
self.field_reader_by_index(idx)?.clone(),
vec![FieldMask::All],
));
}
return Ok(children);
}

let mut grouped = HashMap::<usize, Vec<FieldMask>>::default();
for mask in field_mask {
let Some(field) = mask.starting_field()? else {
continue;
};
let idx = self
.struct_fields()
.find(
field
.as_name()
.vortex_expect("struct fields are always named"),
)
.ok_or_else(|| vortex_err!("Field not found: {field:?}"))?;
grouped.entry(idx).or_default().push(mask.step_into()?);
}

for (idx, masks) in grouped {
children.push((self.field_reader_by_index(idx)?.clone(), masks));
}

Ok(children)
}

/// Utility for partitioning an expression over the fields of a struct.
fn partition_expr(&self, expr: Expression) -> Partitioned {
let key = ExactExpr(expr.clone());
Expand Down Expand Up @@ -258,6 +303,28 @@ impl LayoutReader for StructReader {
})
}

fn split_points(
&self,
field_mask: Vec<FieldMask>,
row_range: Range<u64>,
) -> VortexResult<SplitPointIter> {
if row_range.is_empty() {
return Ok(Box::new(std::iter::empty()));
}

let children = self.split_children(field_mask)?;
if children.is_empty() {
return Ok(Box::new(std::iter::once(row_range.end)));
}

let mut iters = Vec::with_capacity(children.len());
for (child, masks) in children {
iters.push(child.split_points(masks, row_range.clone())?);
}

Ok(merge_split_point_iters(iters))
}

fn pruning_evaluation(
&self,
row_range: &Range<u64>,
Expand Down
9 changes: 9 additions & 0 deletions vortex-layout/src/layouts/zoned/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use vortex_utils::aliases::dash_map::DashMap;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::SplitPointIter;
use crate::layouts::zoned::ZonedLayout;
use crate::layouts::zoned::zone_map::ZoneMap;
use crate::segments::SegmentSource;
Expand Down Expand Up @@ -237,6 +238,14 @@ impl LayoutReader for ZonedReader {
.register_splits(field_mask, row_range, splits)
}

fn split_points(
&self,
field_mask: Vec<FieldMask>,
row_range: Range<u64>,
) -> VortexResult<SplitPointIter> {
self.data_child()?.split_points(field_mask, row_range)
}

fn pruning_evaluation(
&self,
row_range: &Range<u64>,
Expand Down
Loading
Loading