Skip to content

Commit 46ed8f1

Browse files
committed
Add statistics for parquet page level skipping
Signed-off-by: yangjiang <[email protected]>
1 parent f61b43a commit 46ed8f1

File tree

4 files changed

+117
-29
lines changed

4 files changed

+117
-29
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -532,25 +532,47 @@ pub(crate) mod test_util {
532532

533533
pub async fn store_parquet(
534534
batches: Vec<RecordBatch>,
535+
multi_page: bool,
535536
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
536-
let files: Vec<_> = batches
537-
.into_iter()
538-
.map(|batch| {
539-
let mut output = NamedTempFile::new().expect("creating temp file");
540-
541-
let props = WriterProperties::builder().build();
542-
let mut writer =
543-
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
544-
.expect("creating writer");
545-
546-
writer.write(&batch).expect("Writing batch");
547-
writer.close().unwrap();
548-
output
549-
})
550-
.collect();
551-
552-
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
553-
Ok((meta, files))
537+
if multi_page {
538+
// All batches write in to one file, each batch must have same schema.
539+
let mut output = NamedTempFile::new().expect("creating temp file");
540+
let mut builder = WriterProperties::builder();
541+
// todo https://github.com/apache/arrow-rs/issues/2941 release change to row limit.
542+
builder = builder.set_data_pagesize_limit(1);
543+
builder = builder.set_write_batch_size(1);
544+
let proper = builder.build();
545+
let mut writer =
546+
ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
547+
.expect("creating writer");
548+
for b in batches {
549+
writer.write(&b).expect("Writing batch");
550+
//Note: need flush otherwise all batches will write in one pages.
551+
writer.flush().unwrap();
552+
}
553+
writer.close().unwrap();
554+
Ok((vec![local_unpartitioned_file(&output)], vec![output]))
555+
} else {
556+
// Each batch writes to their own file
557+
let files: Vec<_> = batches
558+
.into_iter()
559+
.map(|batch| {
560+
let mut output = NamedTempFile::new().expect("creating temp file");
561+
562+
let props = WriterProperties::builder().build();
563+
let mut writer =
564+
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
565+
.expect("creating writer");
566+
567+
writer.write(&batch).expect("Writing batch");
568+
writer.close().unwrap();
569+
output
570+
})
571+
.collect();
572+
573+
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
574+
Ok((meta, files))
575+
}
554576
}
555577
}
556578

@@ -599,7 +621,7 @@ mod tests {
599621
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
600622

601623
let store = Arc::new(LocalFileSystem::new()) as _;
602-
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
624+
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
603625

604626
let format = ParquetFormat::default();
605627
let schema = format.infer_schema(&store, &meta).await.unwrap();
@@ -738,7 +760,7 @@ mod tests {
738760
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
739761
LocalFileSystem::new(),
740762
)));
741-
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
763+
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
742764

743765
// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
744766
// for the remaining metadata

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ impl FileOpener for ParquetOpener {
449449
// page index pruning: if all data on individual pages can
450450
// be ruled using page metadata, rows from other columns
451451
// with that range can be skipped as well
452-
if let Some(row_selection) = enable_page_index
452+
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
453453
.then(|| {
454454
page_filter::build_page_filter(
455455
pruning_predicate.as_ref(),
@@ -919,7 +919,7 @@ mod tests {
919919
datasource::file_format::{parquet::ParquetFormat, FileFormat},
920920
physical_plan::collect,
921921
};
922-
use arrow::array::Float32Array;
922+
use arrow::array::{Float32Array, Int32Array};
923923
use arrow::datatypes::DataType::Decimal128;
924924
use arrow::record_batch::RecordBatch;
925925
use arrow::{
@@ -960,9 +960,16 @@ mod tests {
960960
predicate: Option<Expr>,
961961
pushdown_predicate: bool,
962962
) -> Result<Vec<RecordBatch>> {
963-
round_trip(batches, projection, schema, predicate, pushdown_predicate)
964-
.await
965-
.batches
963+
round_trip(
964+
batches,
965+
projection,
966+
schema,
967+
predicate,
968+
pushdown_predicate,
969+
false,
970+
)
971+
.await
972+
.batches
966973
}
967974

968975
/// Writes each RecordBatch as an individual parquet file and then
@@ -974,6 +981,7 @@ mod tests {
974981
schema: Option<SchemaRef>,
975982
predicate: Option<Expr>,
976983
pushdown_predicate: bool,
984+
page_index_predicate: bool,
977985
) -> RoundTripResult {
978986
let file_schema = match schema {
979987
Some(schema) => schema,
@@ -983,7 +991,7 @@ mod tests {
983991
),
984992
};
985993

986-
let (meta, _files) = store_parquet(batches).await.unwrap();
994+
let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap();
987995
let file_groups = meta.into_iter().map(Into::into).collect();
988996

989997
// prepare the scan
@@ -1008,6 +1016,10 @@ mod tests {
10081016
.with_reorder_filters(true);
10091017
}
10101018

1019+
if page_index_predicate {
1020+
parquet_exec = parquet_exec.with_enable_page_index(true);
1021+
}
1022+
10111023
let session_ctx = SessionContext::new();
10121024
let task_ctx = session_ctx.task_ctx();
10131025
let parquet_exec = Arc::new(parquet_exec);
@@ -1225,7 +1237,8 @@ mod tests {
12251237
let filter = col("c2").eq(lit(2_i64));
12261238

12271239
// read/write them files:
1228-
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
1240+
let rt =
1241+
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
12291242
let expected = vec![
12301243
"+----+----+----+",
12311244
"| c1 | c3 | c2 |",
@@ -1374,7 +1387,8 @@ mod tests {
13741387
let filter = col("c2").eq(lit(1_i64));
13751388

13761389
// read/write them files:
1377-
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
1390+
let rt =
1391+
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
13781392

13791393
let expected = vec![
13801394
"+----+----+",
@@ -1714,7 +1728,7 @@ mod tests {
17141728
let filter = col("c1").not_eq(lit("bar"));
17151729

17161730
// read/write them files:
1717-
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
1731+
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
17181732

17191733
let metrics = rt.parquet_exec.metrics().unwrap();
17201734

@@ -1732,6 +1746,31 @@ mod tests {
17321746
"no eval time in metrics: {:#?}",
17331747
metrics
17341748
);
1749+
1750+
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
1751+
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
1752+
let batch1 = create_batch(vec![("int", c1.clone())]);
1753+
let batch2 = create_batch(vec![("int", c2.clone())]);
1754+
1755+
let filter = col("int").eq(lit(4_i32));
1756+
1757+
let rt =
1758+
round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await;
1759+
1760+
let metrics = rt.parquet_exec.metrics().unwrap();
1761+
1762+
// assert the batches and some metrics
1763+
let expected = vec![
1764+
"+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+",
1765+
];
1766+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1767+
// todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit.
1768+
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 0);
1769+
assert!(
1770+
get_value(&metrics, "page_index_eval_time") > 0,
1771+
"no eval time in metrics: {:#?}",
1772+
metrics
1773+
);
17351774
}
17361775

17371776
/// returns the sum of all the metrics with the specified name

datafusion/core/src/physical_plan/file_format/parquet/metrics.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub struct ParquetFileMetrics {
3535
pub pushdown_rows_filtered: Count,
3636
/// Total time spent evaluating pushdown filters
3737
pub pushdown_eval_time: Time,
38+
/// Total rows filtered out by parquet page index
39+
pub page_index_rows_filtered: Count,
40+
/// Total time spent evaluating parquet page index filters
41+
pub page_index_eval_time: Time,
3842
}
3943

4044
impl ParquetFileMetrics {
@@ -63,13 +67,22 @@ impl ParquetFileMetrics {
6367
let pushdown_eval_time = MetricBuilder::new(metrics)
6468
.with_new_label("filename", filename.to_string())
6569
.subset_time("pushdown_eval_time", partition);
70+
let page_index_rows_filtered = MetricBuilder::new(metrics)
71+
.with_new_label("filename", filename.to_string())
72+
.counter("page_index_rows_filtered", partition);
73+
74+
let page_index_eval_time = MetricBuilder::new(metrics)
75+
.with_new_label("filename", filename.to_string())
76+
.subset_time("page_index_eval_time", partition);
6677

6778
Self {
6879
predicate_evaluation_errors,
6980
row_groups_pruned,
7081
bytes_scanned,
7182
pushdown_rows_filtered,
7283
pushdown_eval_time,
84+
page_index_rows_filtered,
85+
page_index_eval_time,
7386
}
7487
}
7588
}

datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ pub(crate) fn build_page_filter(
100100
file_metadata: &ParquetMetaData,
101101
file_metrics: &ParquetFileMetrics,
102102
) -> Result<Option<RowSelection>> {
103+
// scoped timer updates on drop
104+
let _timer_guard = file_metrics.page_index_eval_time.timer();
103105
let page_index_predicates =
104106
extract_page_index_push_down_predicates(pruning_predicate, schema)?;
105107

@@ -154,6 +156,18 @@ pub(crate) fn build_page_filter(
154156
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
155157
}
156158
let final_selection = combine_multi_col_selection(row_selections);
159+
let total_skip =
160+
final_selection.iter().fold(
161+
0,
162+
|acc, x| {
163+
if x.skip {
164+
acc + x.row_count
165+
} else {
166+
acc
167+
}
168+
},
169+
);
170+
file_metrics.page_index_rows_filtered.add(total_skip);
157171
Ok(Some(final_selection.into()))
158172
} else {
159173
Ok(None)

0 commit comments

Comments
 (0)