Skip to content

Commit f0f3e8e

Browse files
committed
Introduce ReadPlan to encapsulate the calculation of what rows to decode
1 parent 1f15130 commit f0f3e8e

File tree

3 files changed

+333
-150
lines changed

3 files changed

+333
-150
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 33 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
2121
use arrow_array::Array;
2222
use arrow_array::{RecordBatch, RecordBatchReader};
2323
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24-
use arrow_select::filter::prep_null_mask_filter;
2524
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
2625
pub use selection::{RowSelection, RowSelector};
27-
use std::collections::VecDeque;
2826
use std::sync::Arc;
2927

3028
pub use crate::arrow::array_reader::RowGroups;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3937
use crate::file::reader::{ChunkReader, SerializedPageReader};
4038
use crate::schema::types::SchemaDescriptor;
4139

40+
use read_plan::{ReadPlan, ReadPlanBuilder};
41+
4242
mod filter;
43+
pub(crate) mod read_plan;
4344
mod selection;
4445
pub mod statistics;
4546

@@ -679,38 +680,33 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
679680
};
680681

681682
let mut filter = self.filter;
682-
let mut selection = self.selection;
683+
let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
683684

685+
// Update selection based on any filters
684686
if let Some(filter) = filter.as_mut() {
685687
for predicate in filter.predicates.iter_mut() {
686-
if !selects_any(selection.as_ref()) {
688+
// break early if we have already ruled out all rows
689+
if !plan_builder.selects_any() {
687690
break;
688691
}
689692

693+
// TODO move this into the read_plan
690694
let array_reader =
691695
build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
692696

693-
selection = Some(evaluate_predicate(
694-
batch_size,
695-
array_reader,
696-
selection,
697-
predicate.as_mut(),
698-
)?);
697+
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
699698
}
700699
}
701700

702701
let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
702+
let read_plan = plan_builder
703+
.limited(reader.num_rows())
704+
.with_offset(self.offset)
705+
.with_limit(self.limit)
706+
.build_limited()
707+
.build();
703708

704-
// If selection is empty, truncate
705-
if !selects_any(selection.as_ref()) {
706-
selection = Some(RowSelection::from(vec![]));
707-
}
708-
709-
Ok(ParquetRecordBatchReader::new(
710-
batch_size,
711-
array_reader,
712-
apply_range(selection, reader.num_rows(), self.offset, self.limit),
713-
))
709+
Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
714710
}
715711
}
716712

@@ -789,20 +785,20 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
789785
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
790786
/// read from a parquet data source
791787
pub struct ParquetRecordBatchReader {
792-
batch_size: usize,
793788
array_reader: Box<dyn ArrayReader>,
794789
schema: SchemaRef,
795-
selection: Option<VecDeque<RowSelector>>,
790+
read_plan: ReadPlan,
796791
}
797792

798793
impl Iterator for ParquetRecordBatchReader {
799794
type Item = Result<RecordBatch, ArrowError>;
800795

801796
fn next(&mut self) -> Option<Self::Item> {
802797
let mut read_records = 0;
803-
match self.selection.as_mut() {
798+
let batch_size = self.batch_size();
799+
match self.read_plan.selection_mut() {
804800
Some(selection) => {
805-
while read_records < self.batch_size && !selection.is_empty() {
801+
while read_records < batch_size && !selection.is_empty() {
806802
let front = selection.pop_front().unwrap();
807803
if front.skip {
808804
let skipped = match self.array_reader.skip_records(front.row_count) {
@@ -828,7 +824,7 @@ impl Iterator for ParquetRecordBatchReader {
828824
}
829825

830826
// try to read record
831-
let need_read = self.batch_size - read_records;
827+
let need_read = batch_size - read_records;
832828
let to_read = match front.row_count.checked_sub(need_read) {
833829
Some(remaining) if remaining != 0 => {
834830
// if page row count less than batch_size we must set batch size to page row count.
@@ -846,7 +842,7 @@ impl Iterator for ParquetRecordBatchReader {
846842
}
847843
}
848844
None => {
849-
if let Err(error) = self.array_reader.read_records(self.batch_size) {
845+
if let Err(error) = self.array_reader.read_records(self.batch_size()) {
850846
return Some(Err(error.into()));
851847
}
852848
}
@@ -903,116 +899,37 @@ impl ParquetRecordBatchReader {
903899
let array_reader =
904900
build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
905901

902+
let read_plan = ReadPlanBuilder::new(batch_size)
903+
.with_selection(selection)
904+
.build();
905+
906906
Ok(Self {
907-
batch_size,
908907
array_reader,
909908
schema: Arc::new(Schema::new(levels.fields.clone())),
910-
selection: selection.map(|s| s.trim().into()),
909+
read_plan,
911910
})
912911
}
913912

914913
/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
915914
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
916915
/// all rows will be returned
917-
pub(crate) fn new(
918-
batch_size: usize,
919-
array_reader: Box<dyn ArrayReader>,
920-
selection: Option<RowSelection>,
921-
) -> Self {
916+
pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
922917
let schema = match array_reader.get_data_type() {
923918
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
924919
_ => unreachable!("Struct array reader's data type is not struct!"),
925920
};
926921

927922
Self {
928-
batch_size,
929923
array_reader,
930924
schema: Arc::new(schema),
931-
selection: selection.map(|s| s.trim().into()),
925+
read_plan,
932926
}
933927
}
934-
}
935928

936-
/// Returns `true` if `selection` is `None` or selects some rows
937-
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
938-
selection.map(|x| x.selects_any()).unwrap_or(true)
939-
}
940-
941-
/// Applies an optional offset and limit to an optional [`RowSelection`]
942-
pub(crate) fn apply_range(
943-
mut selection: Option<RowSelection>,
944-
row_count: usize,
945-
offset: Option<usize>,
946-
limit: Option<usize>,
947-
) -> Option<RowSelection> {
948-
// If an offset is defined, apply it to the `selection`
949-
if let Some(offset) = offset {
950-
selection = Some(match row_count.checked_sub(offset) {
951-
None => RowSelection::from(vec![]),
952-
Some(remaining) => selection
953-
.map(|selection| selection.offset(offset))
954-
.unwrap_or_else(|| {
955-
RowSelection::from(vec![
956-
RowSelector::skip(offset),
957-
RowSelector::select(remaining),
958-
])
959-
}),
960-
});
961-
}
962-
963-
// If a limit is defined, apply it to the final `selection`
964-
if let Some(limit) = limit {
965-
selection = Some(
966-
selection
967-
.map(|selection| selection.limit(limit))
968-
.unwrap_or_else(|| {
969-
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
970-
}),
971-
);
972-
}
973-
selection
974-
}
975-
976-
/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
977-
/// which rows to return.
978-
///
979-
/// `input_selection`: Optional pre-existing selection. If `Some`, then the
980-
/// final [`RowSelection`] will be the conjunction of it and the rows selected
981-
/// by `predicate`.
982-
///
983-
/// Note: A pre-existing selection may come from evaluating a previous predicate
984-
/// or if the [`ParquetRecordBatchReader`] specified an explicit
985-
/// [`RowSelection`] in addition to one or more predicates.
986-
pub(crate) fn evaluate_predicate(
987-
batch_size: usize,
988-
array_reader: Box<dyn ArrayReader>,
989-
input_selection: Option<RowSelection>,
990-
predicate: &mut dyn ArrowPredicate,
991-
) -> Result<RowSelection> {
992-
let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
993-
let mut filters = vec![];
994-
for maybe_batch in reader {
995-
let maybe_batch = maybe_batch?;
996-
let input_rows = maybe_batch.num_rows();
997-
let filter = predicate.evaluate(maybe_batch)?;
998-
// Since user supplied predicate, check error here to catch bugs quickly
999-
if filter.len() != input_rows {
1000-
return Err(arrow_err!(
1001-
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
1002-
filter.len()
1003-
));
1004-
}
1005-
match filter.null_count() {
1006-
0 => filters.push(filter),
1007-
_ => filters.push(prep_null_mask_filter(&filter)),
1008-
};
929+
#[inline(always)]
930+
pub(crate) fn batch_size(&self) -> usize {
931+
self.read_plan.batch_size()
1009932
}
1010-
1011-
let raw = RowSelection::from_filters(&filters);
1012-
Ok(match input_selection {
1013-
Some(selection) => selection.and_then(&raw),
1014-
None => raw,
1015-
})
1016933
}
1017934

1018935
#[cfg(test)]
@@ -3991,7 +3908,7 @@ mod tests {
39913908
.build()
39923909
.unwrap();
39933910
assert_ne!(1024, num_rows);
3994-
assert_eq!(reader.batch_size, num_rows as usize);
3911+
assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
39953912
}
39963913

39973914
#[test]

0 commit comments

Comments
 (0)