Skip to content

Commit 8961196

Browse files
committed
Sketch out cached filter result API
1 parent 4b8fbb4 commit 8961196

File tree

4 files changed

+298
-19
lines changed

4 files changed

+298
-19
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,11 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
694694
let array_reader =
695695
build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
696696

697-
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
697+
plan_builder = plan_builder.with_predicate(
698+
array_reader,
699+
predicate.as_mut(),
700+
&self.projection,
701+
)?;
698702
}
699703
}
700704

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 278 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ use crate::arrow::array_reader::ArrayReader;
2222
use crate::arrow::arrow_reader::{
2323
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
2424
};
25+
use crate::arrow::ProjectionMask;
2526
use crate::errors::{ParquetError, Result};
26-
use arrow_array::Array;
27-
use arrow_select::filter::prep_null_mask_filter;
27+
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, RecordBatchReader};
28+
use arrow_schema::{DataType, Schema};
29+
use arrow_select::concat::concat;
30+
use arrow_select::filter::{filter, prep_null_mask_filter};
2831
use std::collections::VecDeque;
2932

3033
/// A builder for [`ReadPlan`]
@@ -35,7 +38,8 @@ pub(crate) struct ReadPlanBuilder {
3538
batch_size: usize,
3639
/// Current to apply, includes all filters
3740
selection: Option<RowSelection>,
38-
// TODO: Cached result of evaluating some columns with the RowSelection
41+
/// Cached result of evaluating some columns with the RowSelection
42+
cached_predicate_result: Option<CachedPredicateResult>,
3943
}
4044

4145
impl ReadPlanBuilder {
@@ -46,6 +50,7 @@ impl ReadPlanBuilder {
4650
Self {
4751
batch_size,
4852
selection: None,
53+
cached_predicate_result: None,
4954
}
5055
}
5156

@@ -88,41 +93,60 @@ impl ReadPlanBuilder {
8893

8994
/// Evaluates an [`ArrowPredicate`], updating the read plan's selection
9095
///
96+
/// # Arguments
97+
///
98+
/// * `array_reader`: The array reader to use for evaluating the predicate.
99+
/// must be configured with the projection mask specified by
100+
/// [`ArrowPredicate::projection`] for the `predicate`.
101+
///
102+
/// * `predicate`: The predicate to evaluate
103+
///
104+
/// * `projection`: The projection mask that will be selected. This code will
105+
/// potentially cache the results of filtering columns that also appear in the
106+
/// projection mask.
107+
///
91108
/// If `this.selection` is `Some`, the resulting [`RowSelection`] will be
92-
/// the conjunction of it and the rows selected by `predicate`.
109+
/// the conjunction of it and the rows selected by `predicate` (they will be
110+
/// `AND`ed).
93111
///
94-
/// Note: A pre-existing selection may come from evaluating a previous predicate
95-
/// or if the [`ParquetRecordBatchReader`] specified an explicit
112+
/// Note: A pre-existing selection may come from evaluating a previous
113+
/// predicate or if the [`ParquetRecordBatchReader`] specifies an explicit
96114
/// [`RowSelection`] in addition to one or more predicates.
97115
pub(crate) fn with_predicate(
98116
mut self,
99117
array_reader: Box<dyn ArrayReader>,
100118
predicate: &mut dyn ArrowPredicate,
119+
projection_mask: &ProjectionMask,
101120
) -> Result<Self> {
121+
// Prepare to decode all rows in the selection to evaluate the predicate
102122
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
103-
let mut filters = vec![];
123+
let mut cached_results_builder = CachedPredicateResultBuilder::new(
124+
&reader.schema(),
125+
predicate.projection(),
126+
projection_mask,
127+
);
104128
for maybe_batch in reader {
105-
let maybe_batch = maybe_batch?;
106-
let input_rows = maybe_batch.num_rows();
107-
let filter = predicate.evaluate(maybe_batch)?;
129+
let batch = maybe_batch?;
130+
let input_rows = batch.num_rows();
131+
let filter = predicate.evaluate(batch.clone())?;
108132
// Since user supplied predicate, check error here to catch bugs quickly
109133
if filter.len() != input_rows {
110134
return Err(arrow_err!(
111135
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
112136
filter.len()
113137
));
114138
}
115-
match filter.null_count() {
116-
0 => filters.push(filter),
117-
_ => filters.push(prep_null_mask_filter(&filter)),
118-
};
139+
cached_results_builder.add(batch, filter)?;
119140
}
120141

121-
let raw = RowSelection::from_filters(&filters);
142+
let (raw, cached_predicate_result) =
143+
cached_results_builder.build(self.batch_size, predicate.projection())?;
122144
self.selection = match self.selection.take() {
123145
Some(selection) => Some(selection.and_then(&raw)),
124146
None => Some(raw),
125147
};
148+
149+
self.cached_predicate_result = cached_predicate_result;
126150
Ok(self)
127151
}
128152

@@ -135,13 +159,15 @@ impl ReadPlanBuilder {
135159
let Self {
136160
batch_size,
137161
selection,
162+
cached_predicate_result,
138163
} = self;
139164

140165
let selection = selection.map(|s| s.trim().into());
141166

142167
ReadPlan {
143168
batch_size,
144169
selection,
170+
cached_predicate_result,
145171
}
146172
}
147173
}
@@ -237,8 +263,11 @@ impl LimitedReadPlanBuilder {
237263
pub(crate) struct ReadPlan {
238264
batch_size: usize,
239265
/// Row ranges to be selected from the data source
266+
/// TODO update this to use something more efficient
267+
/// See <https://github.com/apache/arrow-rs/pull/7454/files#r2092962327>
240268
selection: Option<VecDeque<RowSelector>>,
241-
// TODO: Cached result of evaluating some columns with the RowSelection
269+
/// Cached result of evaluating some column(s) with the current RowSelection
270+
cached_predicate_result: Option<CachedPredicateResult>,
242271
}
243272

244273
impl ReadPlan {
@@ -252,3 +281,236 @@ impl ReadPlan {
252281
self.batch_size
253282
}
254283
}
284+
285+
/// Incrementally builds the result of evaluating a ArrowPredicate on
286+
/// a RowGroup.
287+
struct CachedPredicateResultBuilder {
288+
/// The entire result of the predicate evaluation in memory
289+
///
290+
/// TODO: potentially incrementally build the result of the predicate
291+
/// evaluation without holding all the batches in memory. See
292+
/// <https://github.com/apache/arrow-rs/issues/6692>
293+
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
294+
filters: Vec<BooleanArray>,
295+
}
296+
297+
impl CachedPredicateResultBuilder {
298+
/// Create a new CachedPredicateResultBuilder
299+
///
300+
/// # Arguments:
301+
/// * `schema`: The schema of the filter record batch
302+
/// * `filter_mask`: which columns of the original parquet schema did the filter columns come from?
303+
/// * `projection_mask`: which columns of the original parquet schema are in the final projection?
304+
///
305+
/// This structure does not cache filter results for the columns that are not
306+
/// in the projection mask. This is because the filter results are not needed
307+
fn new(
308+
schema: &Schema,
309+
filter_mask: &ProjectionMask,
310+
projection_mask: &ProjectionMask,
311+
) -> Self {
312+
let mut field_iter = schema.fields.iter();
313+
314+
let (filter_mask_inner, projection_mask_inner) =
315+
match (filter_mask.mask(), projection_mask.mask()) {
316+
(Some(filter_mask), Some(projection_mask)) => (filter_mask, projection_mask),
317+
// NB, None means all columns and we just want the intersection of the two
318+
(Some(filter_mask), None) => (filter_mask, filter_mask),
319+
(None, Some(projection_mask)) => (projection_mask, projection_mask),
320+
(None, None) => {
321+
// this means all columns are in the projection and filter so cache them all when possible
322+
let in_progress_arrays = field_iter
323+
.map(|field| create_in_progress_array(true, field.data_type()))
324+
.collect();
325+
return {
326+
Self {
327+
in_progress_arrays,
328+
filters: vec![],
329+
}
330+
};
331+
}
332+
};
333+
334+
let mut in_progress_arrays = Vec::with_capacity(filter_mask_inner.len());
335+
336+
for (&in_filter, &in_projection) in
337+
filter_mask_inner.iter().zip(projection_mask_inner.iter())
338+
{
339+
if !in_filter {
340+
continue;
341+
}
342+
// field is in the filter
343+
let field = field_iter.next().expect("mismatch in field lengths");
344+
in_progress_arrays.push(create_in_progress_array(in_projection, field.data_type()));
345+
}
346+
assert_eq!(in_progress_arrays.len(), schema.fields().len());
347+
348+
Self {
349+
in_progress_arrays,
350+
filters: vec![],
351+
}
352+
}
353+
354+
/// Add a new batch and filter to the builder
355+
fn add(&mut self, batch: RecordBatch, mut filter: BooleanArray) -> Result<()> {
356+
if filter.null_count() > 0 {
357+
filter = prep_null_mask_filter(&filter);
358+
}
359+
360+
let (_schema, columns, _row_count) = batch.into_parts();
361+
362+
for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(columns.into_iter()) {
363+
in_progress.append(array, &filter)?;
364+
}
365+
366+
self.filters.push(filter);
367+
Ok(())
368+
}
369+
370+
/// Return (selection, maybe_cached_predicate_result) that represents the rows
371+
/// that were selected and batches that were evaluated.
372+
fn build(
373+
self,
374+
_batch_size: usize,
375+
filter_mask: &ProjectionMask,
376+
) -> Result<(RowSelection, Option<CachedPredicateResult>)> {
377+
let Self {
378+
in_progress_arrays,
379+
filters,
380+
} = self;
381+
382+
let new_selection = RowSelection::from_filters(&filters);
383+
384+
let Some(mask) = filter_mask.mask() else {
385+
return Ok((new_selection, None));
386+
};
387+
388+
let mut arrays: Vec<Option<ArrayRef>> = vec![None; mask.len()];
389+
let mut in_progress_arrays = VecDeque::from(in_progress_arrays);
390+
391+
// Now find the location in the original parquet schema of the filter columns
392+
for i in 0..mask.len() {
393+
if mask[i] {
394+
let mut in_progress = in_progress_arrays
395+
.pop_front()
396+
.expect("insufficient inprogress arrays");
397+
arrays[i] = in_progress.try_build()?;
398+
}
399+
}
400+
401+
let cached_result = CachedPredicateResult { arrays, filters };
402+
403+
Ok((new_selection, Some(cached_result)))
404+
}
405+
}
406+
407+
/// The result of evaluating a predicate on a RowGroup with a specific
408+
/// RowSelection
409+
#[derive(Clone)]
410+
struct CachedPredicateResult {
411+
/// Map of parquet schema column index to the result of evaluating the predicate
412+
/// on that column.
413+
///
414+
/// NOTE each array already has the corresponding filters applied
415+
///
416+
/// TODO: store as Vec<Vec<ArrayRef>> to avoid having to have one large
417+
/// array for each column
418+
arrays: Vec<Option<ArrayRef>>,
419+
/// The results of evaluating the predicate (this has already been applied to the
420+
/// cached results)
421+
filters: Vec<BooleanArray>,
422+
}
423+
424+
impl CachedPredicateResult {
425+
fn empty() -> Self {
426+
Self {
427+
arrays: vec![],
428+
filters: vec![],
429+
}
430+
}
431+
432+
/// Apply the results of other to self
433+
/// Updates cached filter result and filters potentially
434+
pub fn merge(self, other: Self) -> Self {
435+
// TODO do something with self
436+
other
437+
}
438+
}
439+
440+
/// Progressively creates array from filtered values
441+
///
442+
/// TODO avoid buffering the input memory
443+
trait InProgressArray {
444+
/// Appends all values of the array to the in progress array at locations where filter[i] is true
445+
/// to the in progress array
446+
fn append(&mut self, _array: ArrayRef, filter: &BooleanArray) -> Result<()>;
447+
448+
/// Builds the final array, consuming all state from self. Returns None if the array
449+
/// cannot be created (e.g. data type not supported or out of buffer space)
450+
fn try_build(&mut self) -> Result<Option<ArrayRef>>;
451+
}
452+
453+
/// Return a new InProgressArray for the given data type
454+
///
455+
/// if `in_projection` is false then a NoOpInProgressArray is returned (will not
456+
/// actually cache arrays results)
457+
fn create_in_progress_array(
458+
in_projection: bool,
459+
_data_type: &DataType,
460+
) -> Box<dyn InProgressArray> {
461+
if in_projection {
462+
Box::new(GenericInProgressArray::new())
463+
} else {
464+
// column is not in the projection, so no need to cache
465+
Box::new(NoOpInProgressArray {})
466+
}
467+
}
468+
469+
/// Placeholder that does nothing until we support the entire set of datatypes
470+
struct NoOpInProgressArray {}
471+
472+
impl InProgressArray for NoOpInProgressArray {
473+
fn append(&mut self, _array: ArrayRef, _filter: &BooleanArray) -> Result<()> {
474+
// do nothing
475+
Ok(())
476+
}
477+
fn try_build(&mut self) -> Result<Option<ArrayRef>> {
478+
// do nothing
479+
Ok(None)
480+
}
481+
}
482+
483+
/// a generic implementation of InProgressArray that uses filter and concat kernels
484+
/// to create the final array
485+
///
486+
/// TODO: make this better with per type implementations
487+
/// <https://github.com/apache/arrow-rs/issues/6692>
488+
struct GenericInProgressArray {
489+
/// previously filtered arrays
490+
arrays: Vec<ArrayRef>,
491+
}
492+
493+
impl GenericInProgressArray {
494+
fn new() -> Self {
495+
Self { arrays: vec![] }
496+
}
497+
}
498+
impl InProgressArray for GenericInProgressArray {
499+
fn append(&mut self, array: ArrayRef, filter_array: &BooleanArray) -> Result<()> {
500+
self.arrays.push(filter(&array, filter_array)?);
501+
Ok(())
502+
}
503+
504+
fn try_build(&mut self) -> Result<Option<ArrayRef>> {
505+
if self.arrays.is_empty() {
506+
return Ok(None);
507+
}
508+
if self.arrays.len() == 1 {
509+
return Ok(Some(self.arrays.pop().unwrap()));
510+
}
511+
// Vomit: need to copy to a new Vec to get dyn array
512+
let arrays: Vec<&dyn Array> = self.arrays.iter().map(|a| a.as_ref()).collect();
513+
let array = concat(&arrays)?;
514+
Ok(Some(array))
515+
}
516+
}

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,8 @@ where
618618
let array_reader =
619619
build_array_reader(self.fields.as_deref(), predicate.projection(), &row_group)?;
620620

621-
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
621+
plan_builder =
622+
plan_builder.with_predicate(array_reader, predicate.as_mut(), &projection)?;
622623
}
623624
}
624625

0 commit comments

Comments
 (0)