Skip to content

Commit ea31da9

Browse files
authored
Fix multicolumn parquet predicate pushdown (#4046) (#4048)
* Fix multicolumn parquet predicate pushdown (#4046) * Format
1 parent c92a38f commit ea31da9

File tree

1 file changed

+48
-7
lines changed

1 file changed

+48
-7
lines changed

datafusion/core/src/physical_plan/file_format/row_filter.rs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,7 @@ impl DatafusionArrowPredicate {
9696
// on the order they appear in the file
9797
let projection = match candidate.projection.len() {
9898
0 | 1 => vec![],
99-
len => {
100-
let mut projection: Vec<_> = (0..len).collect();
101-
projection.sort_unstable_by_key(|x| candidate.projection[*x]);
102-
projection
103-
}
99+
_ => remap_projection(&candidate.projection),
104100
};
105101

106102
Ok(Self {
@@ -278,6 +274,32 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
278274
}
279275
}
280276

277+
/// Computes the projection required to go from the file's schema order to the projected
278+
/// order expected by this filter
279+
///
280+
/// Effectively this computes the rank of each element in `src`
281+
fn remap_projection(src: &[usize]) -> Vec<usize> {
282+
let len = src.len();
283+
284+
// Compute the column mapping from projected order to file order
285+
// i.e. the indices required to sort projected schema into the file schema
286+
//
287+
// e.g. projection: [5, 9, 0] -> [2, 0, 1]
288+
let mut sorted_indexes: Vec<_> = (0..len).collect();
289+
sorted_indexes.sort_unstable_by_key(|x| src[*x]);
290+
291+
// Compute the mapping from schema order to projected order
292+
// i.e. the indices required to sort file schema into the projected schema
293+
//
294+
// Above we computed the order of the projected schema according to the file
295+
// schema, and so we can use this as the comparator
296+
//
297+
// e.g. sorted_indexes [2, 0, 1] -> [1, 2, 0]
298+
let mut projection: Vec<_> = (0..len).collect();
299+
projection.sort_unstable_by_key(|x| sorted_indexes[*x]);
300+
projection
301+
}
302+
281303
/// Calculate the total compressed size of all `Column's required for
282304
/// predicate `Expr`. This should represent the total amount of file IO
283305
/// required to evaluate the predicate.
@@ -382,12 +404,13 @@ pub fn build_row_filter(
382404

383405
#[cfg(test)]
384406
mod test {
407+
use super::*;
385408
use crate::physical_plan::file_format::row_filter::FilterCandidateBuilder;
386-
use arrow::datatypes::{DataType, Field, Schema};
387-
use datafusion_common::ScalarValue;
409+
use arrow::datatypes::Field;
388410
use datafusion_expr::{cast, col, lit};
389411
use parquet::arrow::parquet_to_arrow_schema;
390412
use parquet::file::reader::{FileReader, SerializedFileReader};
413+
use rand::prelude::*;
391414

392415
// Assume a column expression for a column not in the table schema is a projected column and ignore it
393416
#[test]
@@ -471,4 +494,22 @@ mod test {
471494

472495
assert_eq!(candidate.unwrap().expr, expected_candidate_expr);
473496
}
497+
498+
#[test]
499+
fn test_remap_projection() {
500+
let mut rng = thread_rng();
501+
for _ in 0..100 {
502+
// A random selection of column indexes in arbitrary order
503+
let projection: Vec<_> = (0..100).map(|_| rng.gen()).collect();
504+
505+
// File order is the projection sorted
506+
let mut file_order = projection.clone();
507+
file_order.sort_unstable();
508+
509+
let remap = remap_projection(&projection);
510+
// Applying the remapped projection to the file order should yield the original
511+
let remapped: Vec<_> = remap.iter().map(|r| file_order[*r]).collect();
512+
assert_eq!(projection, remapped)
513+
}
514+
}
474515
}

0 commit comments

Comments
 (0)