Skip to content

Commit 68e338b

Browse files
committed
Extract limit code
1 parent 8255c4c commit 68e338b

File tree

3 files changed

+101
-69
lines changed

3 files changed

+101
-69
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -700,9 +700,10 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
700700

701701
let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
702702
let plan = read_plan_builder
703-
.with_num_rows(reader.num_rows())
703+
.limited(reader.num_rows())
704704
.with_offset(self.offset)
705705
.with_limit(self.limit)
706+
.build_limited()
706707
.build()?;
707708

708709
Ok(ParquetRecordBatchReader::new(array_reader, plan))
@@ -899,7 +900,6 @@ impl ParquetRecordBatchReader {
899900
build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
900901

901902
let read_plan = ReadPlanBuilder::new(batch_size)
902-
.with_num_rows(row_groups.num_rows())
903903
.with_selection(selection)
904904
.build()?;
905905

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 97 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,20 @@ use crate::arrow::array_reader::ArrayReader;
2222
use crate::arrow::arrow_reader::{
2323
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
2424
};
25-
use crate::errors::ParquetError;
25+
use crate::errors::{ParquetError, Result};
2626
use arrow_array::Array;
2727
use arrow_select::filter::prep_null_mask_filter;
2828
use std::collections::VecDeque;
2929

3030
/// A builder for [`ReadPlan`]
31+
///
32+
/// See also [`LimitedReadPlanBuilder`], for applying limits and offsets
3133
#[derive(Clone)]
3234
pub(crate) struct ReadPlanBuilder {
3335
batch_size: usize,
3436
/// Current to apply, includes all filters
3537
selection: Option<RowSelection>,
3638
// TODO: Cached result of evaluating some columns with the RowSelection
37-
/// Total number of rows in the row group before the selection
38-
num_rows: Option<usize>,
39-
/// Rows to skip before returning any rows
40-
offset: Option<usize>,
41-
/// Limit on the number of rows to return
42-
limit: Option<usize>,
4339
}
4440

4541
impl ReadPlanBuilder {
@@ -50,9 +46,6 @@ impl ReadPlanBuilder {
5046
Self {
5147
batch_size,
5248
selection: None,
53-
num_rows: None,
54-
offset: None,
55-
limit: None,
5649
}
5750
}
5851

@@ -67,22 +60,14 @@ impl ReadPlanBuilder {
6760
self.selection.as_ref()
6861
}
6962

70-
/// set the number of rows in the row group
71-
pub(crate) fn with_num_rows(mut self, num_rows: usize) -> Self {
72-
self.num_rows = Some(num_rows);
73-
self
74-
}
75-
76-
/// Set the offset to the given value
77-
pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
78-
self.offset = offset;
79-
self
80-
}
81-
82-
/// Set the limit to the given value
83-
pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
84-
self.limit = limit;
85-
self
63+
/// Specify the number of rows in the row group before filtering
64+
/// returning a [`LimitedReadPlanBuilder`] that can apply
65+
/// offset and limit.
66+
///
67+
/// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this
68+
/// selection.
69+
pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
70+
LimitedReadPlanBuilder::new(self, row_count)
8671
}
8772

8873
/// Returns true if the current plan selects any rows
@@ -113,7 +98,7 @@ impl ReadPlanBuilder {
11398
mut self,
11499
array_reader: Box<dyn ArrayReader>,
115100
predicate: &mut dyn ArrowPredicate,
116-
) -> crate::errors::Result<Self> {
101+
) -> Result<Self> {
117102
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()?);
118103
let mut filters = vec![];
119104
for maybe_batch in reader {
@@ -149,47 +134,9 @@ impl ReadPlanBuilder {
149134
}
150135
let Self {
151136
batch_size,
152-
mut selection,
153-
num_rows,
154-
offset,
155-
limit,
137+
selection,
156138
} = self;
157139

158-
// TODO make this nicer somehow (maybe a different builder returned with Rowcount)
159-
if offset.is_some() || limit.is_some() {
160-
let Some(row_count) = num_rows else {
161-
return Err(general_err!(
162-
"Internal ReadPlanBuilder::build() has limit/offset but called without a row count"
163-
));
164-
};
165-
166-
// If an offset is defined, apply it to the `selection`
167-
if let Some(offset) = offset {
168-
selection = Some(match row_count.checked_sub(offset) {
169-
None => RowSelection::from(vec![]),
170-
Some(remaining) => selection
171-
.map(|selection| selection.offset(offset))
172-
.unwrap_or_else(|| {
173-
RowSelection::from(vec![
174-
RowSelector::skip(offset),
175-
RowSelector::select(remaining),
176-
])
177-
}),
178-
});
179-
}
180-
181-
// If a limit is defined, apply it to the final `selection`
182-
if let Some(limit) = limit {
183-
selection = Some(
184-
selection
185-
.map(|selection| selection.limit(limit))
186-
.unwrap_or_else(|| {
187-
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
188-
}),
189-
);
190-
}
191-
}
192-
193140
let selection = selection.map(|s| s.trim().into());
194141

195142
Ok(ReadPlan {
@@ -199,6 +146,90 @@ impl ReadPlanBuilder {
199146
}
200147
}
201148

149+
/// A builder for [`ReadPlan`] that applies a limit and offset to the read plan
150+
///
151+
/// See [`ReadPlanBuilder::limited`] `
152+
pub(crate) struct LimitedReadPlanBuilder {
153+
/// The underlying read plan builder
154+
inner: ReadPlanBuilder,
155+
/// Total number of rows in the row group before the selection, limit or
156+
/// offset are applied
157+
row_count: usize,
158+
/// The offset to apply to the read plan
159+
offset: Option<usize>,
160+
/// The limit to apply to the read plan
161+
limit: Option<usize>,
162+
}
163+
164+
impl LimitedReadPlanBuilder {
165+
/// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows
166+
fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
167+
Self {
168+
inner,
169+
row_count,
170+
offset: None,
171+
limit: None,
172+
}
173+
}
174+
175+
/// Set the offset to apply to the read plan
176+
pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
177+
self.offset = offset;
178+
self
179+
}
180+
181+
/// Set the limit to apply to the read plan
182+
pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
183+
self.limit = limit;
184+
self
185+
}
186+
187+
/// Finalize apply the offset and limit and return the underlying builder
188+
pub(crate) fn build_limited(self) -> ReadPlanBuilder {
189+
let Self {
190+
mut inner,
191+
row_count,
192+
offset,
193+
limit,
194+
} = self;
195+
196+
// If selection is empty, truncate (needed??)
197+
if !inner.selects_any() {
198+
inner.selection = Some(RowSelection::from(vec![]));
199+
}
200+
201+
// If an offset is defined, apply it to the `selection`
202+
if let Some(offset) = offset {
203+
inner.selection = Some(match row_count.checked_sub(offset) {
204+
None => RowSelection::from(vec![]),
205+
Some(remaining) => inner
206+
.selection
207+
.map(|selection| selection.offset(offset))
208+
.unwrap_or_else(|| {
209+
RowSelection::from(vec![
210+
RowSelector::skip(offset),
211+
RowSelector::select(remaining),
212+
])
213+
}),
214+
});
215+
}
216+
217+
// If a limit is defined, apply it to the final `selection`
218+
if let Some(limit) = limit {
219+
inner.selection = Some(
220+
inner
221+
.selection
222+
.map(|selection| selection.limit(limit))
223+
.unwrap_or_else(|| {
224+
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
225+
}),
226+
);
227+
}
228+
229+
inner
230+
}
231+
}
232+
202233
/// Describes what rows to read from a Parquet Row Group
203234
/// including based on [`RowSelection`] and [`RowFilter`].
204235
pub(crate) struct ReadPlan {

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,9 +638,10 @@ where
638638
let plan = plan_builder
639639
// TODO remove this clone (builder selection is used below to fetch pages)
640640
.clone()
641-
.with_num_rows(row_group.row_count)
641+
.limited(row_group.row_count)
642642
.with_offset(self.offset)
643643
.with_limit(self.limit)
644+
.build_limited()
644645
.build()?;
645646

646647
let rows_after = plan.num_rows_selected().unwrap_or(row_group.row_count);

0 commit comments

Comments
 (0)