Skip to content

Commit 78f96d1

Browse files
committed
Sketch out cached filter result API
1 parent b233032 commit 78f96d1

File tree

4 files changed

+280
-19
lines changed

4 files changed

+280
-19
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,11 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
694694
let array_reader =
695695
build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
696696

697-
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
697+
plan_builder = plan_builder.with_predicate(
698+
array_reader,
699+
predicate.as_mut(),
700+
&self.projection,
701+
)?;
698702
}
699703
}
700704

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 260 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ use crate::arrow::array_reader::ArrayReader;
2222
use crate::arrow::arrow_reader::{
2323
ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
2424
};
25+
use crate::arrow::ProjectionMask;
2526
use crate::errors::{ParquetError, Result};
26-
use arrow_array::Array;
27-
use arrow_select::filter::prep_null_mask_filter;
27+
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, RecordBatchReader};
28+
use arrow_schema::{DataType, Schema};
29+
use arrow_select::concat::concat;
30+
use arrow_select::filter::{filter, prep_null_mask_filter};
2831
use std::collections::VecDeque;
2932

3033
/// A builder for [`ReadPlan`]
@@ -35,7 +38,8 @@ pub(crate) struct ReadPlanBuilder {
3538
batch_size: usize,
3639
/// Current to apply, includes all filters
3740
selection: Option<RowSelection>,
38-
// TODO: Cached result of evaluating some columns with the RowSelection
41+
/// Cached result of evaluating some columns with the RowSelection
42+
cached_predicate_result: Option<CachedPredicateResult>,
3943
}
4044

4145
impl ReadPlanBuilder {
@@ -46,6 +50,7 @@ impl ReadPlanBuilder {
4650
Self {
4751
batch_size,
4852
selection: None,
53+
cached_predicate_result: None,
4954
}
5055
}
5156

@@ -88,41 +93,60 @@ impl ReadPlanBuilder {
8893

8994
/// Evaluates an [`ArrowPredicate`], updating the read plan's selection
9095
///
96+
/// # Arguments
97+
///
98+
/// * `array_reader`: The array reader to use for evaluating the predicate.
99+
/// must be configured with the projection mask specified by
100+
/// [`ArrowPredicate::projection`] for the `predicate`.
101+
///
102+
/// * `predicate`: The predicate to evaluate
103+
///
104+
/// * `projection`: The projection mask that will be selected. This code will
105+
/// potentially cache the results of filtering columns that also appear in the
106+
/// projection mask.
107+
///
91108
/// If `this.selection` is `Some`, the resulting [`RowSelection`] will be
92-
/// the conjunction of it and the rows selected by `predicate`.
109+
/// the conjunction of it and the rows selected by `predicate` (they will be
110+
/// `AND`ed).
93111
///
94-
/// Note: A pre-existing selection may come from evaluating a previous predicate
95-
/// or if the [`ParquetRecordBatchReader`] specified an explicit
112+
/// Note: A pre-existing selection may come from evaluating a previous
113+
/// predicate or if the [`ParquetRecordBatchReader`] specifies an explicit
96114
/// [`RowSelection`] in addition to one or more predicates.
97115
pub(crate) fn with_predicate(
98116
mut self,
99117
array_reader: Box<dyn ArrayReader>,
100118
predicate: &mut dyn ArrowPredicate,
119+
projection_mask: &ProjectionMask,
101120
) -> Result<Self> {
121+
// Prepare to decode all rows in the selection to evaluate the predicate
102122
let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build());
103-
let mut filters = vec![];
123+
let mut cached_results_builder = CachedPredicateResultBuilder::new(
124+
&reader.schema(),
125+
predicate.projection(),
126+
projection_mask,
127+
);
104128
for maybe_batch in reader {
105-
let maybe_batch = maybe_batch?;
106-
let input_rows = maybe_batch.num_rows();
107-
let filter = predicate.evaluate(maybe_batch)?;
129+
let batch = maybe_batch?;
130+
let input_rows = batch.num_rows();
131+
let filter = predicate.evaluate(batch.clone())?;
108132
// Since user supplied predicate, check error here to catch bugs quickly
109133
if filter.len() != input_rows {
110134
return Err(arrow_err!(
111135
"ArrowPredicate predicate returned {} rows, expected {input_rows}",
112136
filter.len()
113137
));
114138
}
115-
match filter.null_count() {
116-
0 => filters.push(filter),
117-
_ => filters.push(prep_null_mask_filter(&filter)),
118-
};
139+
cached_results_builder.add(batch, filter)?;
119140
}
120141

121-
let raw = RowSelection::from_filters(&filters);
142+
let (raw, cached_predicate_result) =
143+
cached_results_builder.build(self.batch_size, predicate.projection())?;
122144
self.selection = match self.selection.take() {
123145
Some(selection) => Some(selection.and_then(&raw)),
124146
None => Some(raw),
125147
};
148+
149+
self.cached_predicate_result = cached_predicate_result;
126150
Ok(self)
127151
}
128152

@@ -135,13 +159,15 @@ impl ReadPlanBuilder {
135159
let Self {
136160
batch_size,
137161
selection,
162+
cached_predicate_result,
138163
} = self;
139164

140165
let selection = selection.map(|s| s.trim().into());
141166

142167
ReadPlan {
143168
batch_size,
144169
selection,
170+
cached_predicate_result,
145171
}
146172
}
147173
}
@@ -238,7 +264,8 @@ pub(crate) struct ReadPlan {
238264
batch_size: usize,
239265
/// Row ranges to be selected from the data source
240266
selection: Option<VecDeque<RowSelector>>,
241-
// TODO: Cached result of evaluating some columns with the RowSelection
267+
/// Cached result of evaluating some columns with the RowSelection
268+
cached_predicate_result: Option<CachedPredicateResult>,
242269
}
243270

244271
impl ReadPlan {
@@ -252,3 +279,220 @@ impl ReadPlan {
252279
self.batch_size
253280
}
254281
}
282+
283+
/// Incrementally builds the result of evaluating a ArrowPredicate on
284+
/// a RowGroup.
285+
struct CachedPredicateResultBuilder {
286+
/// The entire result of the predicate evaluation in memory
287+
///
288+
/// TODO: potentially incrementally build the result of the predicate
289+
/// evaluation without holding all the batches in memory. See
290+
/// <https://github.com/apache/arrow-rs/issues/6692>
291+
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
292+
filters: Vec<BooleanArray>,
293+
}
294+
295+
impl CachedPredicateResultBuilder {
296+
/// Create a new CachedPredicateResultBuilder
297+
///
298+
/// # Arguments:
299+
/// * `schema`: The schema of the filter record batch
300+
/// * `filter_mask`: which columns of the original parquet schema did the filter columns come from?
301+
/// * `projection_mask`: which columns of the original parquet schema are in the final projection?
302+
///
303+
/// This structure does not cache filter results for the columns that are not
304+
/// in the projection mask. This is because the filter results are not needed
305+
fn new(
306+
schema: &Schema,
307+
filter_mask: &ProjectionMask,
308+
projection_mask: &ProjectionMask,
309+
) -> Self {
310+
let filter_mask_inner = filter_mask.mask().expect("Need to handle filter_mask::all");
311+
let projection_mask_inner = projection_mask
312+
.mask()
313+
.expect("Need to handle projection_mask::all");
314+
315+
let mut in_progress_arrays = Vec::with_capacity(filter_mask_inner.len());
316+
let mut field_iter = schema.fields.iter();
317+
for (&in_filter, &in_projection) in
318+
filter_mask_inner.iter().zip(projection_mask_inner.iter())
319+
{
320+
if !in_filter {
321+
continue;
322+
}
323+
// field is in the filter
324+
let field = field_iter.next().expect("mismatch in field lengths");
325+
let in_progress = if in_projection {
326+
create_in_progress_array(field.data_type())
327+
} else {
328+
// field is not in the projection, so no need to cache
329+
Box::new(NoOpInProgressArray {})
330+
};
331+
in_progress_arrays.push(in_progress);
332+
}
333+
assert_eq!(in_progress_arrays.len(), filter_mask_inner.len());
334+
335+
Self {
336+
in_progress_arrays,
337+
filters: vec![],
338+
}
339+
}
340+
341+
/// Add a new batch and filter to the builder
342+
fn add(&mut self, batch: RecordBatch, mut filter: BooleanArray) -> Result<()> {
343+
if filter.null_count() > 0 {
344+
filter = prep_null_mask_filter(&filter);
345+
}
346+
347+
let (_schema, columns, _row_count) = batch.into_parts();
348+
349+
for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(columns.into_iter()) {
350+
in_progress.append(array, &filter)?;
351+
}
352+
353+
self.filters.push(filter);
354+
Ok(())
355+
}
356+
357+
/// Return (selection, maybe_cached_predicate_result) that represents the rows
358+
/// that were selected and batches that were evaluated.
359+
fn build(
360+
self,
361+
_batch_size: usize,
362+
filter_mask: &ProjectionMask,
363+
) -> Result<(RowSelection, Option<CachedPredicateResult>)> {
364+
let Self {
365+
in_progress_arrays,
366+
filters,
367+
} = self;
368+
369+
let new_selection = RowSelection::from_filters(&filters);
370+
371+
let Some(mask) = filter_mask.mask() else {
372+
return Ok((new_selection, None));
373+
};
374+
375+
let mut arrays: Vec<Option<ArrayRef>> = vec![None; mask.len()];
376+
let mut in_progress_arrays = VecDeque::from(in_progress_arrays);
377+
378+
// Now find the location in the original parquet schema of the filter columns
379+
for i in 0..mask.len() {
380+
if mask[i] {
381+
let mut in_progress = in_progress_arrays
382+
.pop_front()
383+
.expect("need to be sufficient inprogress arrays");
384+
arrays[i] = in_progress.try_build()?;
385+
assert!(arrays[i].is_some())
386+
}
387+
}
388+
389+
let cached_result = CachedPredicateResult { arrays, filters };
390+
391+
Ok((new_selection, Some(cached_result)))
392+
}
393+
}
394+
395+
/// The result of evaluating a predicate on a RowGroup with a specific
396+
/// RowSelection
397+
#[derive(Clone)]
398+
struct CachedPredicateResult {
399+
/// Map of parquet schema column index to the result of evaluating the predicate
400+
/// on that column.
401+
///
402+
/// NOTE each array already has the corresponding filters applied
403+
///
404+
/// TODO: store as Vec<Vec<ArrayRef>> to avoid having to have one large
405+
/// array for each column
406+
arrays: Vec<Option<ArrayRef>>,
407+
/// The results of evaluating the predicate (this has already been applied to the
408+
/// cached results)
409+
filters: Vec<BooleanArray>,
410+
/*
411+
/// The result of a predicate evaluation
412+
/// in memory. Has an array for each column in the schema.
413+
filtered_batches: Vec<ArrayRef>,
414+
/// The filters applied to the original RowSelection
415+
filters: Vec<BooleanArray>,
416+
*/
417+
}
418+
419+
impl CachedPredicateResult {
420+
fn empty() -> Self {
421+
Self {
422+
arrays: vec![],
423+
filters: vec![],
424+
}
425+
}
426+
427+
/// Apply the results of other to self
428+
/// Updates cached filter result and filters potentially
429+
pub fn merge(self, other: Self) -> Self {
430+
// TODO do something with self
431+
other
432+
}
433+
}
434+
435+
/// Progressively creates array from filtered values
436+
///
437+
/// TODO avoid buffering the input memory
438+
trait InProgressArray {
439+
/// Appends all values of the array to the in progress array at locations where filter[i] is true
440+
/// to the in progress array
441+
fn append(&mut self, _array: ArrayRef, filter: &BooleanArray) -> Result<()>;
442+
443+
/// Builds the final array, consuming all state from self. Returns None if the array
444+
/// cannot be created (e.g. data type not supported or out of buffer space)
445+
fn try_build(&mut self) -> Result<Option<ArrayRef>>;
446+
}
447+
448+
fn create_in_progress_array(_data_type: &DataType) -> Box<dyn InProgressArray> {
449+
Box::new(NoOpInProgressArray {})
450+
}
451+
452+
/// Placeholder that does nothing until we support the entire set of datatypes
453+
struct NoOpInProgressArray {}
454+
455+
impl InProgressArray for NoOpInProgressArray {
456+
fn append(&mut self, _array: ArrayRef, _filter: &BooleanArray) -> Result<()> {
457+
// do nothing
458+
Ok(())
459+
}
460+
fn try_build(&mut self) -> Result<Option<ArrayRef>> {
461+
// do nothing
462+
Ok(None)
463+
}
464+
}
465+
466+
/// a generic implementation of InProgressArray that uses filter and concat kernels
467+
/// to create the final array
468+
///
469+
/// TODO: make this better with per type implementations
470+
/// <https://github.com/apache/arrow-rs/issues/6692>
471+
struct GenericInProgressArray {
472+
/// previously filtered arrays
473+
arrays: Vec<ArrayRef>,
474+
}
475+
476+
impl GenericInProgressArray {
477+
fn new() -> Self {
478+
Self { arrays: vec![] }
479+
}
480+
481+
fn append(&mut self, array: ArrayRef, filter_array: &BooleanArray) -> Result<()> {
482+
self.arrays.push(filter(&array, filter_array)?);
483+
Ok(())
484+
}
485+
486+
fn try_build(&mut self) -> Result<Option<ArrayRef>> {
487+
if self.arrays.is_empty() {
488+
return Ok(None);
489+
}
490+
if self.arrays.len() == 1 {
491+
return Ok(Some(self.arrays.pop().unwrap()));
492+
}
493+
// Vomit: need to copy to a new Vec to get dyn array
494+
let arrays: Vec<&dyn Array> = self.arrays.iter().map(|a| a.as_ref()).collect();
495+
let array = concat(&arrays)?;
496+
Ok(Some(array))
497+
}
498+
}

parquet/src/arrow/async_reader/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,8 @@ where
618618
let array_reader =
619619
build_array_reader(self.fields.as_deref(), predicate.projection(), &row_group)?;
620620

621-
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
621+
plan_builder =
622+
plan_builder.with_predicate(array_reader, predicate.as_mut(), &projection)?;
622623
}
623624
}
624625

0 commit comments

Comments
 (0)