Skip to content

Commit 72a8114

Browse files
committed
Move Selection logic into ReadPlan builder
1 parent 531370f commit 72a8114

File tree

3 files changed

+513
-63
lines changed

3 files changed

+513
-63
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -809,53 +809,43 @@ impl ParquetRecordBatchReader {
809809
/// simplify error handling with `?`
810810
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
811811
let mut read_records = 0;
812-
let batch_size = self.batch_size();
813-
match self.read_plan.selection_mut() {
814-
Some(selection) => {
815-
while read_records < batch_size && !selection.is_empty() {
816-
let front = selection.pop_front().unwrap();
812+
813+
match &mut self.read_plan {
814+
ReadPlan::All { batch_size } => {
815+
self.array_reader.read_records(*batch_size)?;
816+
}
817+
ReadPlan::Subset { iterator } => {
818+
let batch_size = iterator.batch_size();
819+
820+
while read_records < batch_size {
821+
let Some(front) = self.read_plan.next() else {
822+
break;
823+
};
824+
817825
if front.skip {
818826
let skipped = self.array_reader.skip_records(front.row_count)?;
819827

820828
if skipped != front.row_count {
821829
return Err(general_err!(
822-
"failed to skip rows, expected {}, got {}",
830+
"Internal Error: failed to skip rows, expected {}, got {}",
823831
front.row_count,
824832
skipped
825833
));
826834
}
827-
continue;
828-
}
829-
830-
//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
831-
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
832-
if front.row_count == 0 {
833-
continue;
834-
}
835-
836-
// try to read record
837-
let need_read = batch_size - read_records;
838-
let to_read = match front.row_count.checked_sub(need_read) {
839-
Some(remaining) if remaining != 0 => {
840-
// if page row count less than batch_size we must set batch size to page row count.
841-
// add check avoid dead loop
842-
selection.push_front(RowSelector::select(remaining));
843-
need_read
835+
} else {
836+
let read = self.array_reader.read_records(front.row_count)?;
837+
if read == 0 {
838+
break;
844839
}
845-
_ => front.row_count,
846-
};
847-
match self.array_reader.read_records(to_read)? {
848-
0 => break,
849-
rec => read_records += rec,
840+
841+
read_records += read
850842
};
851843
}
852844
}
853-
None => {
854-
self.array_reader.read_records(batch_size)?;
855-
}
856-
};
845+
}
857846

858847
let array = self.array_reader.consume_batch()?;
848+
859849
let struct_array = array.as_struct_opt().ok_or_else(|| {
860850
ArrowError::ParquetError("Struct array reader should return struct array".to_string())
861851
})?;
@@ -928,9 +918,13 @@ impl ParquetRecordBatchReader {
928918
}
929919
}
930920

921+
#[cfg(test)]
931922
#[inline(always)]
932923
pub(crate) fn batch_size(&self) -> usize {
933-
self.read_plan.batch_size()
924+
match &self.read_plan {
925+
ReadPlan::All { batch_size } => *batch_size,
926+
ReadPlan::Subset { iterator } => iterator.batch_size(),
927+
}
934928
}
935929
}
936930

@@ -3910,7 +3904,7 @@ mod tests {
39103904
.build()
39113905
.unwrap();
39123906
assert_ne!(1024, num_rows);
3913-
assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
3907+
assert_eq!(reader.batch_size(), num_rows as usize);
39143908
}
39153909

39163910
#[test]

0 commit comments

Comments
 (0)