Skip to content

Commit 671cef8

Browse files
Ted-Jiangalamb
andauthored
Prune pages are all null in ParquetExec by row_counts and fix NOT NULL prune (#10051)
* Prune pages are all null in ParquetExec by row_counts and fix NOT NULL prune * fix clippy * Update datafusion/core/src/physical_optimizer/pruning.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb <[email protected]> * remove allocate vec * better way avoid allocate vec * simply expr --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent d698d9d commit 671cef8

File tree

5 files changed

+153
-30
lines changed

5 files changed

+153
-30
lines changed

datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ fn prune_pages_in_one_row_group(
314314
col_page_indexes,
315315
col_offset_indexes,
316316
target_type: &target_type,
317+
num_rows_in_row_group: group.num_rows(),
317318
};
318319

319320
match predicate.prune(&pruning_stats) {
@@ -385,6 +386,7 @@ struct PagesPruningStatistics<'a> {
385386
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
386387
// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
387388
target_type: &'a Option<DataType>,
389+
num_rows_in_row_group: i64,
388390
}
389391

390392
// Extract the min or max value calling `func` from page idex
@@ -548,7 +550,19 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
548550
}
549551

550552
fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
551-
None
553+
// see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982
554+
555+
let row_count_per_page = self.col_offset_indexes.windows(2).map(|location| {
556+
Some(location[1].first_row_index - location[0].first_row_index)
557+
});
558+
559+
// append the last page row count
560+
let row_count_per_page = row_count_per_page.chain(std::iter::once(Some(
561+
self.num_rows_in_row_group
562+
- self.col_offset_indexes.last().unwrap().first_row_index,
563+
)));
564+
565+
Some(Arc::new(Int64Array::from_iter(row_count_per_page)))
552566
}
553567

554568
fn contained(

datafusion/core/src/physical_optimizer/pruning.rs

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ pub trait PruningStatistics {
335335
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
336336
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
337337
/// `x IS NULL` | `x_null_count > 0`
338-
/// `x IS NOT NULL` | `x_null_count = 0`
338+
/// `x IS NOT NULL` | `x_null_count != row_count`
339339
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
340340
///
341341
/// ## Predicate Evaluation
@@ -1240,10 +1240,10 @@ fn build_single_column_expr(
12401240
/// returns a pruning expression in terms of IsNull that will evaluate to true
12411241
/// if the column may contain null, and false if definitely does not
12421242
/// contain null.
1243-
/// If set `with_not` to true: which means is not null
1244-
/// Given an expression reference to `expr`, if `expr` is a column expression,
1245-
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
1246-
/// if the column not contain any null, and false if definitely contain null.
1243+
/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1244+
/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1245+
/// at least one NULL value. In this case we can know that `IS NOT NULL` can not be true and
1246+
/// thus can prune the row group / value
12471247
fn build_is_null_column_expr(
12481248
expr: &Arc<dyn PhysicalExpr>,
12491249
schema: &Schema,
@@ -1254,26 +1254,37 @@ fn build_is_null_column_expr(
12541254
let field = schema.field_with_name(col.name()).ok()?;
12551255

12561256
let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1257-
required_columns
1258-
.null_count_column_expr(col, expr, null_count_field)
1259-
.map(|null_count_column_expr| {
1260-
if with_not {
1261-
// IsNotNull(column) => null_count = 0
1262-
Arc::new(phys_expr::BinaryExpr::new(
1263-
null_count_column_expr,
1264-
Operator::Eq,
1265-
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1266-
)) as _
1267-
} else {
1257+
if with_not {
1258+
if let Ok(row_count_expr) =
1259+
required_columns.row_count_column_expr(col, expr, null_count_field)
1260+
{
1261+
required_columns
1262+
.null_count_column_expr(col, expr, null_count_field)
1263+
.map(|null_count_column_expr| {
1264+
// IsNotNull(column) => null_count != row_count
1265+
Arc::new(phys_expr::BinaryExpr::new(
1266+
null_count_column_expr,
1267+
Operator::NotEq,
1268+
row_count_expr,
1269+
)) as _
1270+
})
1271+
.ok()
1272+
} else {
1273+
None
1274+
}
1275+
} else {
1276+
required_columns
1277+
.null_count_column_expr(col, expr, null_count_field)
1278+
.map(|null_count_column_expr| {
12681279
// IsNull(column) => null_count > 0
12691280
Arc::new(phys_expr::BinaryExpr::new(
12701281
null_count_column_expr,
12711282
Operator::Gt,
12721283
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
12731284
)) as _
1274-
}
1275-
})
1276-
.ok()
1285+
})
1286+
.ok()
1287+
}
12771288
} else {
12781289
None
12791290
}

datafusion/core/tests/parquet/mod.rs

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow::{
2828
record_batch::RecordBatch,
2929
util::pretty::pretty_format_batches,
3030
};
31-
use arrow_array::new_null_array;
31+
use arrow_array::make_array;
3232
use chrono::{Datelike, Duration, TimeDelta};
3333
use datafusion::{
3434
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
@@ -77,6 +77,7 @@ enum Scenario {
7777
ByteArray,
7878
PeriodsInColumnNames,
7979
WithNullValues,
80+
WithNullValuesPageLevel,
8081
}
8182

8283
enum Unit {
@@ -632,22 +633,60 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch {
632633
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap()
633634
}
634635

635-
/// Return record batch with i8, i16, i32, and i64 sequences with all Null values
636-
fn make_all_null_values() -> RecordBatch {
636+
/// Return record batch with i8, i16, i32, and i64 sequences with Null values
637+
/// here 5 rows in page when using Unit::Page
638+
fn make_int_batches_with_null(
639+
null_values: usize,
640+
no_null_values_start: usize,
641+
no_null_values_end: usize,
642+
) -> RecordBatch {
637643
let schema = Arc::new(Schema::new(vec![
638644
Field::new("i8", DataType::Int8, true),
639645
Field::new("i16", DataType::Int16, true),
640646
Field::new("i32", DataType::Int32, true),
641647
Field::new("i64", DataType::Int64, true),
642648
]));
643649

650+
let v8: Vec<i8> = (no_null_values_start as _..no_null_values_end as _).collect();
651+
let v16: Vec<i16> = (no_null_values_start as _..no_null_values_end as _).collect();
652+
let v32: Vec<i32> = (no_null_values_start as _..no_null_values_end as _).collect();
653+
let v64: Vec<i64> = (no_null_values_start as _..no_null_values_end as _).collect();
654+
644655
RecordBatch::try_new(
645656
schema,
646657
vec![
647-
new_null_array(&DataType::Int8, 5),
648-
new_null_array(&DataType::Int16, 5),
649-
new_null_array(&DataType::Int32, 5),
650-
new_null_array(&DataType::Int64, 5),
658+
make_array(
659+
Int8Array::from_iter(
660+
v8.into_iter()
661+
.map(Some)
662+
.chain(std::iter::repeat(None).take(null_values)),
663+
)
664+
.to_data(),
665+
),
666+
make_array(
667+
Int16Array::from_iter(
668+
v16.into_iter()
669+
.map(Some)
670+
.chain(std::iter::repeat(None).take(null_values)),
671+
)
672+
.to_data(),
673+
),
674+
make_array(
675+
Int32Array::from_iter(
676+
v32.into_iter()
677+
.map(Some)
678+
.chain(std::iter::repeat(None).take(null_values)),
679+
)
680+
.to_data(),
681+
),
682+
make_array(
683+
Int64Array::from_iter(
684+
v64.into_iter()
685+
.map(Some)
686+
.chain(std::iter::repeat(None).take(null_values)),
687+
)
688+
.to_data(),
689+
),
651690
],
652691
)
653692
.unwrap()
@@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
824863
}
825864
Scenario::WithNullValues => {
826865
vec![
827-
make_all_null_values(),
866+
make_int_batches_with_null(5, 0, 0),
828867
make_int_batches(1, 6),
829-
make_all_null_values(),
868+
make_int_batches_with_null(5, 0, 0),
869+
]
870+
}
871+
Scenario::WithNullValuesPageLevel => {
872+
vec![
873+
make_int_batches_with_null(5, 1, 6),
874+
make_int_batches(1, 11),
875+
make_int_batches_with_null(1, 1, 10),
876+
make_int_batches_with_null(5, 1, 6),
830877
]
831878
}
832879
}

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,57 @@ async fn without_pushdown_filter() {
871871
assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
872872
}
873873

874+
#[tokio::test]
875+
// Data layout like this:
876+
// row_group1: page1(1~5), page2(All Null)
877+
// row_group2: page1(1~5), page2(6~10)
878+
// row_group3: page1(1~5), page2(6~9 + Null)
879+
// row_group4: page1(1~5), page2(All Null)
880+
// total 40 rows
881+
async fn test_pages_with_null_values() {
882+
test_prune(
883+
Scenario::WithNullValuesPageLevel,
884+
"SELECT * FROM t where i8 <= 6",
885+
Some(0),
886+
// expect prune pages with all null or pages that have only values greater than 6
887+
// (row_group1, page2), (row_group4, page2)
888+
Some(10),
889+
22,
890+
)
891+
.await;
892+
893+
test_prune(
894+
Scenario::WithNullValuesPageLevel,
895+
"SELECT * FROM t where \"i16\" is not null",
896+
Some(0),
897+
// expect prune (row_group1, page2) and (row_group4, page2) = 10 rows
898+
Some(10),
899+
29,
900+
)
901+
.await;
902+
903+
test_prune(
904+
Scenario::WithNullValuesPageLevel,
905+
"SELECT * FROM t where \"i32\" is null",
906+
Some(0),
907+
// expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows
908+
Some(25),
909+
11,
910+
)
911+
.await;
912+
913+
test_prune(
914+
Scenario::WithNullValuesPageLevel,
915+
"SELECT * FROM t where \"i64\" > 6",
916+
Some(0),
917+
// expect to prune pages where i is all null, or where always <= 5
918+
// (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows
919+
Some(30),
920+
7,
921+
)
922+
.await;
923+
}
924+
874925
fn cast_count_metric(metric: MetricValue) -> Option<usize> {
875926
match metric {
876927
MetricValue::Count { count, .. } => Some(count.value()),

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1296,7 +1296,7 @@ async fn test_row_group_with_null_values() {
12961296
.test_row_group_prune()
12971297
.await;
12981298

1299-
// After pruning, only row group 2should be selected
1299+
// After pruning, only row group 2 should be selected
13001300
RowGroupPruningTest::new()
13011301
.with_scenario(Scenario::WithNullValues)
13021302
.with_query("SELECT * FROM t WHERE \"i16\" is Not Null")

0 commit comments

Comments
 (0)