Skip to content

Commit ed27e31

Browse files
tustvoldalamb
authored andcommitted
Fix predicate pushdown bugs: project columns within DatafusionArrowPredicate (apache#4005) (apache#4006) (apache#4021)
* Project columns within DatafusionArrowPredicate (apache#4005) (apache#4006) * Add test * Format * Fix merge blunder Co-authored-by: Andrew Lamb <[email protected]>
1 parent 9a561de commit ed27e31

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,6 +1676,34 @@ mod tests {
16761676
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
16771677
}
16781678

1679+
#[tokio::test]
1680+
async fn multi_column_predicate_pushdown() {
1681+
let c1: ArrayRef =
1682+
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1683+
1684+
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1685+
1686+
let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1687+
1688+
// Columns in different order to schema
1689+
let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
1690+
1691+
// read/write them files:
1692+
let read = round_trip_to_parquet(vec![batch1], None, None, Some(filter), true)
1693+
.await
1694+
.unwrap();
1695+
1696+
let expected = vec![
1697+
"+-----+----+",
1698+
"| c1 | c2 |",
1699+
"+-----+----+",
1700+
"| Foo | 1 |",
1701+
"| bar | |",
1702+
"+-----+----+",
1703+
];
1704+
assert_batches_sorted_eq!(expected, &read);
1705+
}
1706+
16791707
#[tokio::test]
16801708
async fn evolved_schema_incompatible_types() {
16811709
let c1: ArrayRef =

datafusion/core/src/physical_plan/file_format/row_filter.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ use crate::physical_plan::metrics;
6767
#[derive(Debug)]
6868
pub(crate) struct DatafusionArrowPredicate {
6969
physical_expr: Arc<dyn PhysicalExpr>,
70-
projection: ProjectionMask,
70+
projection_mask: ProjectionMask,
71+
projection: Vec<usize>,
7172
/// how many rows were filtered out by this predicate
7273
rows_filtered: metrics::Count,
7374
/// how long was spent evaluating this predicate
@@ -90,9 +91,22 @@ impl DatafusionArrowPredicate {
9091
let physical_expr =
9192
create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?;
9293

94+
// ArrowPredicate::evaluate is passed columns in the order they appear in the file
95+
// If the predicate has multiple columns, we therefore must project the columns based
96+
// on the order they appear in the file
97+
let projection = match candidate.projection.len() {
98+
0 | 1 => vec![],
99+
len => {
100+
let mut projection: Vec<_> = (0..len).collect();
101+
projection.sort_unstable_by_key(|x| candidate.projection[*x]);
102+
projection
103+
}
104+
};
105+
93106
Ok(Self {
94107
physical_expr,
95-
projection: ProjectionMask::roots(
108+
projection,
109+
projection_mask: ProjectionMask::roots(
96110
metadata.file_metadata().schema_descr(),
97111
candidate.projection,
98112
),
@@ -104,10 +118,15 @@ impl DatafusionArrowPredicate {
104118

105119
impl ArrowPredicate for DatafusionArrowPredicate {
106120
fn projection(&self) -> &ProjectionMask {
107-
&self.projection
121+
&self.projection_mask
108122
}
109123

110124
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
125+
let batch = match self.projection.is_empty() {
126+
true => batch,
127+
false => batch.project(&self.projection)?,
128+
};
129+
111130
// scoped timer updates on drop
112131
let mut timer = self.time.timer();
113132
match self

0 commit comments

Comments
 (0)