Skip to content

Commit ec24724

Browse files
alambTed-Jiang
andauthored
Support page skipping / page_index pushdown for evolved schemas (#5268)
* Make the page index tests clearer about what they are doing * Support page skipping / page_index pushdown for evolved schemas * upate test * Update datafusion/core/src/datasource/file_format/parquet.rs Co-authored-by: Yang Jiang <[email protected]> --------- Co-authored-by: Yang Jiang <[email protected]>
1 parent d05647c commit ec24724

File tree

4 files changed

+251
-72
lines changed

4 files changed

+251
-72
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -550,50 +550,63 @@ pub(crate) mod test_util {
550550
use parquet::file::properties::WriterProperties;
551551
use tempfile::NamedTempFile;
552552

553+
/// How many rows per page should be written
554+
const ROWS_PER_PAGE: usize = 2;
555+
553556
/// Writes `batches` to a temporary parquet file
554557
///
555-
/// If multi_page is set to `true`, all batches are written into
556-
/// one temporary parquet file and the parquet file is written
558+
/// If multi_page is set to `true`, the parquet file(s) are written
557559
/// with 2 rows per data page (used to test page filtering and
558560
/// boundaries).
559561
pub async fn store_parquet(
560562
batches: Vec<RecordBatch>,
561563
multi_page: bool,
562564
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
563-
if multi_page {
564-
// All batches write in to one file, each batch must have same schema.
565-
let mut output = NamedTempFile::new().expect("creating temp file");
566-
let mut builder = WriterProperties::builder();
567-
builder = builder.set_data_page_row_count_limit(2);
568-
let proper = builder.build();
569-
let mut writer =
570-
ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
571-
.expect("creating writer");
572-
for b in batches {
573-
writer.write(&b).expect("Writing batch");
574-
}
575-
writer.close().unwrap();
576-
Ok((vec![local_unpartitioned_file(&output)], vec![output]))
577-
} else {
578-
// Each batch writes to their own file
579-
let files: Vec<_> = batches
580-
.into_iter()
581-
.map(|batch| {
582-
let mut output = NamedTempFile::new().expect("creating temp file");
565+
// Each batch writes to their own file
566+
let files: Vec<_> = batches
567+
.into_iter()
568+
.map(|batch| {
569+
let mut output = NamedTempFile::new().expect("creating temp file");
570+
571+
let builder = WriterProperties::builder();
572+
let props = if multi_page {
573+
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
574+
} else {
575+
builder
576+
}
577+
.build();
583578

584-
let props = WriterProperties::builder().build();
585-
let mut writer =
586-
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
587-
.expect("creating writer");
579+
let mut writer =
580+
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
581+
.expect("creating writer");
588582

583+
if multi_page {
584+
// write in smaller batches as the parquet writer
585+
// only checks datapage size limits on the boundaries of each batch
586+
write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE);
587+
} else {
589588
writer.write(&batch).expect("Writing batch");
590-
writer.close().unwrap();
591-
output
592-
})
593-
.collect();
589+
};
590+
writer.close().unwrap();
591+
output
592+
})
593+
.collect();
594594

595-
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
596-
Ok((meta, files))
595+
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
596+
Ok((meta, files))
597+
}
598+
599+
//// write batches chunk_size rows at a time
600+
fn write_in_chunks<W: std::io::Write>(
601+
writer: &mut ArrowWriter<W>,
602+
batch: &RecordBatch,
603+
chunk_size: usize,
604+
) {
605+
let mut i = 0;
606+
while i < batch.num_rows() {
607+
let num = chunk_size.min(batch.num_rows() - i);
608+
writer.write(&batch.slice(i, num)).unwrap();
609+
i += num;
597610
}
598611
}
599612
}

datafusion/core/src/physical_optimizer/pruning.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
//! other source (e.g. a catalog)
3030
3131
use std::convert::TryFrom;
32-
use std::{collections::HashSet, sync::Arc};
32+
use std::sync::Arc;
3333

3434
use crate::execution::context::ExecutionProps;
3535
use crate::prelude::lit;
@@ -233,25 +233,18 @@ impl PruningPredicate {
233233
.unwrap_or_default()
234234
}
235235

236-
/// Returns all need column indexes to evaluate this pruning predicate
237-
pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
238-
let mut set = HashSet::new();
239-
self.required_columns.columns.iter().for_each(|x| {
240-
match self.schema().column_with_name(x.0.name.as_str()) {
241-
None => {}
242-
Some(y) => {
243-
set.insert(y.0);
244-
}
245-
}
246-
});
247-
set
236+
pub(crate) fn required_columns(&self) -> &RequiredStatColumns {
237+
&self.required_columns
248238
}
249239
}
250240

241+
/// Records for which columns statistics are necessary to evaluate a
242+
/// pruning predicate.
243+
///
251244
/// Handles creating references to the min/max statistics
252245
/// for columns as well as recording which statistics are needed
253246
#[derive(Debug, Default, Clone)]
254-
struct RequiredStatColumns {
247+
pub(crate) struct RequiredStatColumns {
255248
/// The statistics required to evaluate this predicate:
256249
/// * The unqualified column in the input schema
257250
/// * Statistics type (e.g. Min or Max or Null_Count)
@@ -267,7 +260,7 @@ impl RequiredStatColumns {
267260

268261
/// Returns an iterator over items in columns (see doc on
269262
/// `self.columns` for details)
270-
fn iter(&self) -> impl Iterator<Item = &(Column, StatisticsType, Field)> {
263+
pub(crate) fn iter(&self) -> impl Iterator<Item = &(Column, StatisticsType, Field)> {
271264
self.columns.iter()
272265
}
273266

@@ -852,7 +845,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result<
852845
}
853846

854847
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
855-
enum StatisticsType {
848+
pub(crate) enum StatisticsType {
856849
Min,
857850
Max,
858851
NullCount,

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 124 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -836,8 +836,7 @@ mod tests {
836836

837837
/// round-trip record batches by writing each individual RecordBatch to
838838
/// a parquet file and then reading that parquet file with the specified
839-
/// options. If page_index_predicate is set to `true`, all RecordBatches
840-
/// are written into a parquet file instead.
839+
/// options.
841840
#[derive(Debug, Default)]
842841
struct RoundTrip {
843842
projection: Option<Vec<usize>>,
@@ -1331,6 +1330,80 @@ mod tests {
13311330
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
13321331
}
13331332

1333+
#[tokio::test]
1334+
async fn evolved_schema_disjoint_schema_with_page_index_pushdown() {
1335+
let c1: ArrayRef = Arc::new(StringArray::from(vec![
1336+
// Page 1
1337+
Some("Foo"),
1338+
Some("Bar"),
1339+
// Page 2
1340+
Some("Foo2"),
1341+
Some("Bar2"),
1342+
// Page 3
1343+
Some("Foo3"),
1344+
Some("Bar3"),
1345+
]));
1346+
1347+
let c2: ArrayRef = Arc::new(Int64Array::from(vec![
1348+
// Page 1:
1349+
Some(1),
1350+
Some(2),
1351+
// Page 2: (pruned)
1352+
Some(3),
1353+
Some(4),
1354+
// Page 3: (pruned)
1355+
Some(5),
1356+
None,
1357+
]));
1358+
1359+
// batch1: c1(string)
1360+
let batch1 = create_batch(vec![("c1", c1.clone())]);
1361+
1362+
// batch2: c2(int64)
1363+
let batch2 = create_batch(vec![("c2", c2.clone())]);
1364+
1365+
// batch3 (has c2, c1) -- both columns, should still prune
1366+
let batch3 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1367+
1368+
// batch4 (has c2, c1) -- different column order, should still prune
1369+
let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]);
1370+
1371+
let filter = col("c2").eq(lit(1_i64));
1372+
1373+
// read/write them files:
1374+
let rt = RoundTrip::new()
1375+
.with_predicate(filter)
1376+
.with_page_index_predicate()
1377+
.round_trip(vec![batch1, batch2, batch3, batch4])
1378+
.await;
1379+
1380+
let expected = vec![
1381+
"+------+----+",
1382+
"| c1 | c2 |",
1383+
"+------+----+",
1384+
"| | 1 |",
1385+
"| | 2 |",
1386+
"| Bar | |",
1387+
"| Bar | 2 |",
1388+
"| Bar | 2 |",
1389+
"| Bar2 | |",
1390+
"| Bar3 | |",
1391+
"| Foo | |",
1392+
"| Foo | 1 |",
1393+
"| Foo | 1 |",
1394+
"| Foo2 | |",
1395+
"| Foo3 | |",
1396+
"+------+----+",
1397+
];
1398+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1399+
let metrics = rt.parquet_exec.metrics().unwrap();
1400+
1401+
// There are 4 rows pruned in each of batch2, batch3, and
1402+
// batch4 for a total of 12. batch1 had no pruning as c2 was
1403+
// filled in as null
1404+
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 12);
1405+
}
1406+
13341407
#[tokio::test]
13351408
async fn multi_column_predicate_pushdown() {
13361409
let c1: ArrayRef =
@@ -1362,6 +1435,38 @@ mod tests {
13621435
assert_batches_sorted_eq!(expected, &read);
13631436
}
13641437

1438+
#[tokio::test]
1439+
async fn multi_column_predicate_pushdown_page_index_pushdown() {
1440+
let c1: ArrayRef =
1441+
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1442+
1443+
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1444+
1445+
let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1446+
1447+
// Columns in different order to schema
1448+
let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
1449+
1450+
// read/write them files:
1451+
let read = RoundTrip::new()
1452+
.with_predicate(filter)
1453+
.with_page_index_predicate()
1454+
.round_trip_to_batches(vec![batch1])
1455+
.await
1456+
.unwrap();
1457+
1458+
let expected = vec![
1459+
"+-----+----+",
1460+
"| c1 | c2 |",
1461+
"+-----+----+",
1462+
"| | 2 |",
1463+
"| Foo | 1 |",
1464+
"| bar | |",
1465+
"+-----+----+",
1466+
];
1467+
assert_batches_sorted_eq!(expected, &read);
1468+
}
1469+
13651470
#[tokio::test]
13661471
async fn evolved_schema_incompatible_types() {
13671472
let c1: ArrayRef =
@@ -1635,27 +1740,38 @@ mod tests {
16351740

16361741
#[tokio::test]
16371742
async fn parquet_page_index_exec_metrics() {
1638-
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
1639-
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
1743+
let c1: ArrayRef = Arc::new(Int32Array::from(vec![
1744+
Some(1),
1745+
None,
1746+
Some(2),
1747+
Some(3),
1748+
Some(4),
1749+
Some(5),
1750+
]));
16401751
let batch1 = create_batch(vec![("int", c1.clone())]);
1641-
let batch2 = create_batch(vec![("int", c2.clone())]);
16421752

16431753
let filter = col("int").eq(lit(4_i32));
16441754

16451755
let rt = RoundTrip::new()
16461756
.with_predicate(filter)
16471757
.with_page_index_predicate()
1648-
.round_trip(vec![batch1, batch2])
1758+
.round_trip(vec![batch1])
16491759
.await;
16501760

16511761
let metrics = rt.parquet_exec.metrics().unwrap();
16521762

16531763
// assert the batches and some metrics
1764+
#[rustfmt::skip]
16541765
let expected = vec![
1655-
"+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+",
1766+
"+-----+",
1767+
"| int |",
1768+
"+-----+",
1769+
"| 4 |",
1770+
"| 5 |",
1771+
"+-----+",
16561772
];
16571773
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1658-
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
1774+
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
16591775
assert!(
16601776
get_value(&metrics, "page_index_eval_time") > 0,
16611777
"no eval time in metrics: {metrics:#?}"

0 commit comments

Comments
 (0)