Skip to content

Introduce ReadPlan to encapsulate the calculation of what parquet rows to decode #7502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 32 additions & 117 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::collections::VecDeque;
use std::sync::Arc;

pub use crate::arrow::array_reader::RowGroups;
Expand All @@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;

pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};

mod filter;
mod read_plan;
mod selection;
pub mod statistics;

Expand Down Expand Up @@ -679,38 +680,32 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
};

let mut filter = self.filter;
let mut selection = self.selection;
let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
Copy link
Contributor Author

@alamb alamb May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The high level idea is to avoid manipulating the RowSelection directly in the APIs and instead manipulate them via ReadPlanBuilder / ReadPlan

The reason to encapsulate them in a new struct is to make it feasible to add additional complexity such as cached filter results and better filter row representations

This PR simply moves code around -- it does not intended to change any of the actual logic yet


// Update selection based on any filters
if let Some(filter) = filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
if !selects_any(selection.as_ref()) {
// break early if we have ruled out all rows
if !plan_builder.selects_any() {
break;
}

let array_reader =
build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;

selection = Some(evaluate_predicate(
batch_size,
array_reader,
selection,
predicate.as_mut(),
)?);
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
}
}

let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
let read_plan = plan_builder
.limited(reader.num_rows())
.with_offset(self.offset)
.with_limit(self.limit)
.build_limited()
.build();

// If selection is empty, truncate
if !selects_any(selection.as_ref()) {
selection = Some(RowSelection::from(vec![]));
}

Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
apply_range(selection, reader.num_rows(), self.offset, self.limit),
))
Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
}
}

Expand Down Expand Up @@ -789,11 +784,9 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
/// read from a parquet data source
pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
/// Row ranges to be selected from the data source
selection: Option<VecDeque<RowSelector>>,
read_plan: ReadPlan,
}

impl Iterator for ParquetRecordBatchReader {
Expand All @@ -814,9 +807,10 @@ impl ParquetRecordBatchReader {
/// simplify error handling with `?`
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut read_records = 0;
match self.selection.as_mut() {
let batch_size = self.batch_size();
match self.read_plan.selection_mut() {
Copy link
Contributor Author

@alamb alamb May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is in future PRs we will change this control loop so it can use BooleanArray when that is a more efficient representation. Specifically, I think we can follow the model @zhuqi-lucas did in #7454

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb , the POC for the adaptive predicate pushdown based this PR is good:

#7524 (comment)

Some(selection) => {
while read_records < self.batch_size && !selection.is_empty() {
while read_records < batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped = self.array_reader.skip_records(front.row_count)?;
Expand All @@ -838,7 +832,7 @@ impl ParquetRecordBatchReader {
}

// try to read record
let need_read = self.batch_size - read_records;
let need_read = batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
Expand All @@ -855,7 +849,7 @@ impl ParquetRecordBatchReader {
}
}
None => {
self.array_reader.read_records(self.batch_size)?;
self.array_reader.read_records(batch_size)?;
}
};

Expand Down Expand Up @@ -905,116 +899,37 @@ impl ParquetRecordBatchReader {
let array_reader =
build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;

let read_plan = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
.build();

Ok(Self {
batch_size,
array_reader,
schema: Arc::new(Schema::new(levels.fields.clone())),
selection: selection.map(|s| s.trim().into()),
read_plan,
})
}

/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
/// all rows will be returned
pub(crate) fn new(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
selection: Option<RowSelection>,
) -> Self {
pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};

Self {
batch_size,
array_reader,
schema: Arc::new(schema),
selection: selection.map(|s| s.trim().into()),
read_plan,
}
}
}

/// Returns `true` if `selection` is `None` or selects some rows
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is moved into ReadPlanBuilder

selection.map(|x| x.selects_any()).unwrap_or(true)
}

/// Applies an optional offset and limit to an optional [`RowSelection`]
pub(crate) fn apply_range(
mut selection: Option<RowSelection>,
row_count: usize,
offset: Option<usize>,
limit: Option<usize>,
) -> Option<RowSelection> {
// If an offset is defined, apply it to the `selection`
if let Some(offset) = offset {
selection = Some(match row_count.checked_sub(offset) {
None => RowSelection::from(vec![]),
Some(remaining) => selection
.map(|selection| selection.offset(offset))
.unwrap_or_else(|| {
RowSelection::from(vec![
RowSelector::skip(offset),
RowSelector::select(remaining),
])
}),
});
}

// If a limit is defined, apply it to the final `selection`
if let Some(limit) = limit {
selection = Some(
selection
.map(|selection| selection.limit(limit))
.unwrap_or_else(|| {
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
}),
);
}
selection
}

/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
/// which rows to return.
///
/// `input_selection`: Optional pre-existing selection. If `Some`, then the
/// final [`RowSelection`] will be the conjunction of it and the rows selected
/// by `predicate`.
///
/// Note: A pre-existing selection may come from evaluating a previous predicate
/// or if the [`ParquetRecordBatchReader`] specified an explicit
/// [`RowSelection`] in addition to one or more predicates.
pub(crate) fn evaluate_predicate(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
input_selection: Option<RowSelection>,
predicate: &mut dyn ArrowPredicate,
) -> Result<RowSelection> {
let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
let mut filters = vec![];
for maybe_batch in reader {
let maybe_batch = maybe_batch?;
let input_rows = maybe_batch.num_rows();
let filter = predicate.evaluate(maybe_batch)?;
// Since user supplied predicate, check error here to catch bugs quickly
if filter.len() != input_rows {
return Err(arrow_err!(
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
filter.len()
));
}
match filter.null_count() {
0 => filters.push(filter),
_ => filters.push(prep_null_mask_filter(&filter)),
};
#[inline(always)]
pub(crate) fn batch_size(&self) -> usize {
self.read_plan.batch_size()
}

let raw = RowSelection::from_filters(&filters);
Ok(match input_selection {
Some(selection) => selection.and_then(&raw),
None => raw,
})
}

#[cfg(test)]
Expand Down Expand Up @@ -3993,7 +3908,7 @@ mod tests {
.build()
.unwrap();
assert_ne!(1024, num_rows);
assert_eq!(reader.batch_size, num_rows as usize);
assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
}

#[test]
Expand Down
Loading
Loading