Skip to content

Commit 08c433e

Browse files
committed
Introduce ReadPlan to encapsulate the calculation of what rows to decode
1 parent 741121b commit 08c433e

File tree

3 files changed

+334
-151
lines changed

3 files changed

+334
-151
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 33 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
2121
use arrow_array::Array;
2222
use arrow_array::{RecordBatch, RecordBatchReader};
2323
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24-
use arrow_select::filter::prep_null_mask_filter;
2524
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
2625
pub use selection::{RowSelection, RowSelector};
27-
use std::collections::VecDeque;
2826
use std::sync::Arc;
2927

3028
pub use crate::arrow::array_reader::RowGroups;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3937
use crate::file::reader::{ChunkReader, SerializedPageReader};
4038
use crate::schema::types::SchemaDescriptor;
4139

40+
use read_plan::{ReadPlan, ReadPlanBuilder};
41+
4242
mod filter;
43+
pub(crate) mod read_plan;
4344
mod selection;
4445
pub mod statistics;
4546

@@ -679,38 +680,33 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
679680
};
680681

681682
let mut filter = self.filter;
682-
let mut selection = self.selection;
683+
let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
683684

685+
// Update selection based on any filters
684686
if let Some(filter) = filter.as_mut() {
685687
for predicate in filter.predicates.iter_mut() {
686-
if !selects_any(selection.as_ref()) {
688+
// break early if we have already ruled out all rows
689+
if !plan_builder.selects_any() {
687690
break;
688691
}
689692

693+
// TODO move this into the read_plan
690694
let array_reader =
691695
build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
692696

693-
selection = Some(evaluate_predicate(
694-
batch_size,
695-
array_reader,
696-
selection,
697-
predicate.as_mut(),
698-
)?);
697+
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
699698
}
700699
}
701700

702701
let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
702+
let read_plan = plan_builder
703+
.limited(reader.num_rows())
704+
.with_offset(self.offset)
705+
.with_limit(self.limit)
706+
.build_limited()
707+
.build();
703708

704-
// If selection is empty, truncate
705-
if !selects_any(selection.as_ref()) {
706-
selection = Some(RowSelection::from(vec![]));
707-
}
708-
709-
Ok(ParquetRecordBatchReader::new(
710-
batch_size,
711-
array_reader,
712-
apply_range(selection, reader.num_rows(), self.offset, self.limit),
713-
))
709+
Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
714710
}
715711
}
716712

@@ -789,21 +785,20 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
789785
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
790786
/// read from a parquet data source
791787
pub struct ParquetRecordBatchReader {
792-
batch_size: usize,
793788
array_reader: Box<dyn ArrayReader>,
794789
schema: SchemaRef,
795-
/// Row ranges to be selected from the data source
796-
selection: Option<VecDeque<RowSelector>>,
790+
read_plan: ReadPlan,
797791
}
798792

799793
impl Iterator for ParquetRecordBatchReader {
800794
type Item = Result<RecordBatch, ArrowError>;
801795

802796
fn next(&mut self) -> Option<Self::Item> {
803797
let mut read_records = 0;
804-
match self.selection.as_mut() {
798+
let batch_size = self.batch_size();
799+
match self.read_plan.selection_mut() {
805800
Some(selection) => {
806-
while read_records < self.batch_size && !selection.is_empty() {
801+
while read_records < batch_size && !selection.is_empty() {
807802
let front = selection.pop_front().unwrap();
808803
if front.skip {
809804
let skipped = match self.array_reader.skip_records(front.row_count) {
@@ -829,7 +824,7 @@ impl Iterator for ParquetRecordBatchReader {
829824
}
830825

831826
// try to read record
832-
let need_read = self.batch_size - read_records;
827+
let need_read = batch_size - read_records;
833828
let to_read = match front.row_count.checked_sub(need_read) {
834829
Some(remaining) if remaining != 0 => {
835830
// if page row count less than batch_size we must set batch size to page row count.
@@ -847,7 +842,7 @@ impl Iterator for ParquetRecordBatchReader {
847842
}
848843
}
849844
None => {
850-
if let Err(error) = self.array_reader.read_records(self.batch_size) {
845+
if let Err(error) = self.array_reader.read_records(self.batch_size()) {
851846
return Some(Err(error.into()));
852847
}
853848
}
@@ -904,116 +899,37 @@ impl ParquetRecordBatchReader {
904899
let array_reader =
905900
build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
906901

902+
let read_plan = ReadPlanBuilder::new(batch_size)
903+
.with_selection(selection)
904+
.build();
905+
907906
Ok(Self {
908-
batch_size,
909907
array_reader,
910908
schema: Arc::new(Schema::new(levels.fields.clone())),
911-
selection: selection.map(|s| s.trim().into()),
909+
read_plan,
912910
})
913911
}
914912

915913
/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
916914
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
917915
/// all rows will be returned
918-
pub(crate) fn new(
919-
batch_size: usize,
920-
array_reader: Box<dyn ArrayReader>,
921-
selection: Option<RowSelection>,
922-
) -> Self {
916+
pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
923917
let schema = match array_reader.get_data_type() {
924918
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
925919
_ => unreachable!("Struct array reader's data type is not struct!"),
926920
};
927921

928922
Self {
929-
batch_size,
930923
array_reader,
931924
schema: Arc::new(schema),
932-
selection: selection.map(|s| s.trim().into()),
925+
read_plan,
933926
}
934927
}
935-
}
936928

937-
/// Returns `true` if `selection` is `None` or selects some rows
938-
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
939-
selection.map(|x| x.selects_any()).unwrap_or(true)
940-
}
941-
942-
/// Applies an optional offset and limit to an optional [`RowSelection`]
943-
pub(crate) fn apply_range(
944-
mut selection: Option<RowSelection>,
945-
row_count: usize,
946-
offset: Option<usize>,
947-
limit: Option<usize>,
948-
) -> Option<RowSelection> {
949-
// If an offset is defined, apply it to the `selection`
950-
if let Some(offset) = offset {
951-
selection = Some(match row_count.checked_sub(offset) {
952-
None => RowSelection::from(vec![]),
953-
Some(remaining) => selection
954-
.map(|selection| selection.offset(offset))
955-
.unwrap_or_else(|| {
956-
RowSelection::from(vec![
957-
RowSelector::skip(offset),
958-
RowSelector::select(remaining),
959-
])
960-
}),
961-
});
962-
}
963-
964-
// If a limit is defined, apply it to the final `selection`
965-
if let Some(limit) = limit {
966-
selection = Some(
967-
selection
968-
.map(|selection| selection.limit(limit))
969-
.unwrap_or_else(|| {
970-
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
971-
}),
972-
);
973-
}
974-
selection
975-
}
976-
977-
/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
978-
/// which rows to return.
979-
///
980-
/// `input_selection`: Optional pre-existing selection. If `Some`, then the
981-
/// final [`RowSelection`] will be the conjunction of it and the rows selected
982-
/// by `predicate`.
983-
///
984-
/// Note: A pre-existing selection may come from evaluating a previous predicate
985-
/// or if the [`ParquetRecordBatchReader`] specified an explicit
986-
/// [`RowSelection`] in addition to one or more predicates.
987-
pub(crate) fn evaluate_predicate(
988-
batch_size: usize,
989-
array_reader: Box<dyn ArrayReader>,
990-
input_selection: Option<RowSelection>,
991-
predicate: &mut dyn ArrowPredicate,
992-
) -> Result<RowSelection> {
993-
let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
994-
let mut filters = vec![];
995-
for maybe_batch in reader {
996-
let maybe_batch = maybe_batch?;
997-
let input_rows = maybe_batch.num_rows();
998-
let filter = predicate.evaluate(maybe_batch)?;
999-
// Since user supplied predicate, check error here to catch bugs quickly
1000-
if filter.len() != input_rows {
1001-
return Err(arrow_err!(
1002-
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
1003-
filter.len()
1004-
));
1005-
}
1006-
match filter.null_count() {
1007-
0 => filters.push(filter),
1008-
_ => filters.push(prep_null_mask_filter(&filter)),
1009-
};
929+
#[inline(always)]
930+
pub(crate) fn batch_size(&self) -> usize {
931+
self.read_plan.batch_size()
1010932
}
1011-
1012-
let raw = RowSelection::from_filters(&filters);
1013-
Ok(match input_selection {
1014-
Some(selection) => selection.and_then(&raw),
1015-
None => raw,
1016-
})
1017933
}
1018934

1019935
#[cfg(test)]
@@ -3992,7 +3908,7 @@ mod tests {
39923908
.build()
39933909
.unwrap();
39943910
assert_ne!(1024, num_rows);
3995-
assert_eq!(reader.batch_size, num_rows as usize);
3911+
assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
39963912
}
39973913

39983914
#[test]

0 commit comments

Comments
 (0)