Skip to content

Commit ce18e5b

Browse files
alambetseidl
andauthored
Introduce ReadPlan to encapsulate the calculation of what parquet rows to decode (#7502)
* Introduce `ReadPlan` to encapsulate the calculation of what rows to decode * Update parquet/src/arrow/arrow_reader/read_plan.rs Co-authored-by: Ed Seidl <[email protected]> --------- Co-authored-by: Ed Seidl <[email protected]>
1 parent 6721ec1 commit ce18e5b

File tree

3 files changed

+326
-151
lines changed

3 files changed

+326
-151
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 32 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
2121
use arrow_array::Array;
2222
use arrow_array::{RecordBatch, RecordBatchReader};
2323
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24-
use arrow_select::filter::prep_null_mask_filter;
2524
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
2625
pub use selection::{RowSelection, RowSelector};
27-
use std::collections::VecDeque;
2826
use std::sync::Arc;
2927

3028
pub use crate::arrow::array_reader::RowGroups;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3937
use crate::file::reader::{ChunkReader, SerializedPageReader};
4038
use crate::schema::types::SchemaDescriptor;
4139

40+
pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
41+
4242
mod filter;
43+
mod read_plan;
4344
mod selection;
4445
pub mod statistics;
4546

@@ -679,38 +680,32 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
679680
};
680681

681682
let mut filter = self.filter;
682-
let mut selection = self.selection;
683+
let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
683684

685+
// Update selection based on any filters
684686
if let Some(filter) = filter.as_mut() {
685687
for predicate in filter.predicates.iter_mut() {
686-
if !selects_any(selection.as_ref()) {
688+
// break early if we have ruled out all rows
689+
if !plan_builder.selects_any() {
687690
break;
688691
}
689692

690693
let array_reader =
691694
build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
692695

693-
selection = Some(evaluate_predicate(
694-
batch_size,
695-
array_reader,
696-
selection,
697-
predicate.as_mut(),
698-
)?);
696+
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
699697
}
700698
}
701699

702700
let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
701+
let read_plan = plan_builder
702+
.limited(reader.num_rows())
703+
.with_offset(self.offset)
704+
.with_limit(self.limit)
705+
.build_limited()
706+
.build();
703707

704-
// If selection is empty, truncate
705-
if !selects_any(selection.as_ref()) {
706-
selection = Some(RowSelection::from(vec![]));
707-
}
708-
709-
Ok(ParquetRecordBatchReader::new(
710-
batch_size,
711-
array_reader,
712-
apply_range(selection, reader.num_rows(), self.offset, self.limit),
713-
))
708+
Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
714709
}
715710
}
716711

@@ -789,11 +784,9 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
789784
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
790785
/// read from a parquet data source
791786
pub struct ParquetRecordBatchReader {
792-
batch_size: usize,
793787
array_reader: Box<dyn ArrayReader>,
794788
schema: SchemaRef,
795-
/// Row ranges to be selected from the data source
796-
selection: Option<VecDeque<RowSelector>>,
789+
read_plan: ReadPlan,
797790
}
798791

799792
impl Iterator for ParquetRecordBatchReader {
@@ -814,9 +807,10 @@ impl ParquetRecordBatchReader {
814807
/// simplify error handling with `?`
815808
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
816809
let mut read_records = 0;
817-
match self.selection.as_mut() {
810+
let batch_size = self.batch_size();
811+
match self.read_plan.selection_mut() {
818812
Some(selection) => {
819-
while read_records < self.batch_size && !selection.is_empty() {
813+
while read_records < batch_size && !selection.is_empty() {
820814
let front = selection.pop_front().unwrap();
821815
if front.skip {
822816
let skipped = self.array_reader.skip_records(front.row_count)?;
@@ -838,7 +832,7 @@ impl ParquetRecordBatchReader {
838832
}
839833

840834
// try to read record
841-
let need_read = self.batch_size - read_records;
835+
let need_read = batch_size - read_records;
842836
let to_read = match front.row_count.checked_sub(need_read) {
843837
Some(remaining) if remaining != 0 => {
844838
// if page row count less than batch_size we must set batch size to page row count.
@@ -855,7 +849,7 @@ impl ParquetRecordBatchReader {
855849
}
856850
}
857851
None => {
858-
self.array_reader.read_records(self.batch_size)?;
852+
self.array_reader.read_records(batch_size)?;
859853
}
860854
};
861855

@@ -905,116 +899,37 @@ impl ParquetRecordBatchReader {
905899
let array_reader =
906900
build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
907901

902+
let read_plan = ReadPlanBuilder::new(batch_size)
903+
.with_selection(selection)
904+
.build();
905+
908906
Ok(Self {
909-
batch_size,
910907
array_reader,
911908
schema: Arc::new(Schema::new(levels.fields.clone())),
912-
selection: selection.map(|s| s.trim().into()),
909+
read_plan,
913910
})
914911
}
915912

916913
/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
917914
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
918915
/// all rows will be returned
919-
pub(crate) fn new(
920-
batch_size: usize,
921-
array_reader: Box<dyn ArrayReader>,
922-
selection: Option<RowSelection>,
923-
) -> Self {
916+
pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
924917
let schema = match array_reader.get_data_type() {
925918
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
926919
_ => unreachable!("Struct array reader's data type is not struct!"),
927920
};
928921

929922
Self {
930-
batch_size,
931923
array_reader,
932924
schema: Arc::new(schema),
933-
selection: selection.map(|s| s.trim().into()),
925+
read_plan,
934926
}
935927
}
936-
}
937928

938-
/// Returns `true` if `selection` is `None` or selects some rows
939-
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
940-
selection.map(|x| x.selects_any()).unwrap_or(true)
941-
}
942-
943-
/// Applies an optional offset and limit to an optional [`RowSelection`]
944-
pub(crate) fn apply_range(
945-
mut selection: Option<RowSelection>,
946-
row_count: usize,
947-
offset: Option<usize>,
948-
limit: Option<usize>,
949-
) -> Option<RowSelection> {
950-
// If an offset is defined, apply it to the `selection`
951-
if let Some(offset) = offset {
952-
selection = Some(match row_count.checked_sub(offset) {
953-
None => RowSelection::from(vec![]),
954-
Some(remaining) => selection
955-
.map(|selection| selection.offset(offset))
956-
.unwrap_or_else(|| {
957-
RowSelection::from(vec![
958-
RowSelector::skip(offset),
959-
RowSelector::select(remaining),
960-
])
961-
}),
962-
});
963-
}
964-
965-
// If a limit is defined, apply it to the final `selection`
966-
if let Some(limit) = limit {
967-
selection = Some(
968-
selection
969-
.map(|selection| selection.limit(limit))
970-
.unwrap_or_else(|| {
971-
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
972-
}),
973-
);
974-
}
975-
selection
976-
}
977-
978-
/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
979-
/// which rows to return.
980-
///
981-
/// `input_selection`: Optional pre-existing selection. If `Some`, then the
982-
/// final [`RowSelection`] will be the conjunction of it and the rows selected
983-
/// by `predicate`.
984-
///
985-
/// Note: A pre-existing selection may come from evaluating a previous predicate
986-
/// or if the [`ParquetRecordBatchReader`] specified an explicit
987-
/// [`RowSelection`] in addition to one or more predicates.
988-
pub(crate) fn evaluate_predicate(
989-
batch_size: usize,
990-
array_reader: Box<dyn ArrayReader>,
991-
input_selection: Option<RowSelection>,
992-
predicate: &mut dyn ArrowPredicate,
993-
) -> Result<RowSelection> {
994-
let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
995-
let mut filters = vec![];
996-
for maybe_batch in reader {
997-
let maybe_batch = maybe_batch?;
998-
let input_rows = maybe_batch.num_rows();
999-
let filter = predicate.evaluate(maybe_batch)?;
1000-
// Since user supplied predicate, check error here to catch bugs quickly
1001-
if filter.len() != input_rows {
1002-
return Err(arrow_err!(
1003-
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
1004-
filter.len()
1005-
));
1006-
}
1007-
match filter.null_count() {
1008-
0 => filters.push(filter),
1009-
_ => filters.push(prep_null_mask_filter(&filter)),
1010-
};
929+
#[inline(always)]
930+
pub(crate) fn batch_size(&self) -> usize {
931+
self.read_plan.batch_size()
1011932
}
1012-
1013-
let raw = RowSelection::from_filters(&filters);
1014-
Ok(match input_selection {
1015-
Some(selection) => selection.and_then(&raw),
1016-
None => raw,
1017-
})
1018933
}
1019934

1020935
#[cfg(test)]
@@ -3993,7 +3908,7 @@ mod tests {
39933908
.build()
39943909
.unwrap();
39953910
assert_ne!(1024, num_rows);
3996-
assert_eq!(reader.batch_size, num_rows as usize);
3911+
assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
39973912
}
39983913

39993914
#[test]

0 commit comments

Comments
 (0)