Skip to content

Commit b7171b4

Browse files
committed
Add parquet predicate pushdown metrics
1 parent d7f2fba commit b7171b4

File tree

3 files changed

+193
-36
lines changed

3 files changed

+193
-36
lines changed

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 140 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,10 @@ pub struct ParquetFileMetrics {
237237
pub row_groups_pruned: metrics::Count,
238238
/// Total number of bytes scanned
239239
pub bytes_scanned: metrics::Count,
240+
/// Total rows filtered out by predicates pushed into parquet scan
241+
pub pushdown_rows_filtered: metrics::Count,
242+
/// Total time spent evaluating pushdown filters
243+
pub pushdown_eval_time: metrics::Time,
240244
}
241245

242246
impl ParquetFileMetrics {
@@ -258,10 +262,20 @@ impl ParquetFileMetrics {
258262
.with_new_label("filename", filename.to_string())
259263
.counter("bytes_scanned", partition);
260264

265+
let pushdown_rows_filtered = MetricBuilder::new(metrics)
266+
.with_new_label("filename", filename.to_string())
267+
.counter("pushdown_rows_filtered", partition);
268+
269+
let pushdown_eval_time = MetricBuilder::new(metrics)
270+
.with_new_label("filename", filename.to_string())
271+
.subset_time("pushdown_eval_time", partition);
272+
261273
Self {
262274
predicate_evaluation_errors,
263275
row_groups_pruned,
264276
bytes_scanned,
277+
pushdown_rows_filtered,
278+
pushdown_eval_time,
265279
}
266280
}
267281
}
@@ -410,7 +424,7 @@ impl FileOpener for ParquetOpener {
410424
) -> Result<FileOpenFuture> {
411425
let file_range = file_meta.range.clone();
412426

413-
let metrics = ParquetFileMetrics::new(
427+
let file_metrics = ParquetFileMetrics::new(
414428
self.partition_index,
415429
file_meta.location().as_ref(),
416430
&self.metrics,
@@ -456,6 +470,8 @@ impl FileOpener for ParquetOpener {
456470
table_schema.as_ref(),
457471
builder.metadata(),
458472
reorder_predicates,
473+
&file_metrics.pushdown_rows_filtered,
474+
&file_metrics.pushdown_eval_time,
459475
);
460476

461477
match row_filter {
@@ -474,8 +490,12 @@ impl FileOpener for ParquetOpener {
474490

475491
let file_metadata = builder.metadata();
476492
let groups = file_metadata.row_groups();
477-
let row_groups =
478-
prune_row_groups(groups, file_range, pruning_predicate.clone(), &metrics);
493+
let row_groups = prune_row_groups(
494+
groups,
495+
file_range,
496+
pruning_predicate.clone(),
497+
&file_metrics,
498+
);
479499

480500
if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) {
481501
let file_offset_indexes = file_metadata.offset_indexes();
@@ -491,7 +511,7 @@ impl FileOpener for ParquetOpener {
491511
pruning_predicate.clone(),
492512
file_offset_indexes.get(*r),
493513
file_page_indexes.get(*r),
494-
&metrics,
514+
&file_metrics,
495515
)
496516
.map_err(|e| {
497517
ArrowError::ParquetError(format!(
@@ -575,7 +595,7 @@ impl DefaultParquetFileReaderFactory {
575595
struct ParquetFileReader {
576596
store: Arc<dyn ObjectStore>,
577597
meta: ObjectMeta,
578-
metrics: ParquetFileMetrics,
598+
file_metrics: ParquetFileMetrics,
579599
metadata_size_hint: Option<usize>,
580600
}
581601

@@ -584,7 +604,7 @@ impl AsyncFileReader for ParquetFileReader {
584604
&mut self,
585605
range: Range<usize>,
586606
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
587-
self.metrics.bytes_scanned.add(range.end - range.start);
607+
self.file_metrics.bytes_scanned.add(range.end - range.start);
588608

589609
self.store
590610
.get_range(&self.meta.location, range)
@@ -602,7 +622,7 @@ impl AsyncFileReader for ParquetFileReader {
602622
Self: Send,
603623
{
604624
let total = ranges.iter().map(|r| r.end - r.start).sum();
605-
self.metrics.bytes_scanned.add(total);
625+
self.file_metrics.bytes_scanned.add(total);
606626

607627
async move {
608628
self.store
@@ -647,7 +667,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
647667
metadata_size_hint: Option<usize>,
648668
metrics: &ExecutionPlanMetricsSet,
649669
) -> Result<Box<dyn AsyncFileReader + Send>> {
650-
let parquet_file_metrics = ParquetFileMetrics::new(
670+
let file_metrics = ParquetFileMetrics::new(
651671
partition_index,
652672
file_meta.location().as_ref(),
653673
metrics,
@@ -657,7 +677,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
657677
meta: file_meta.object_meta,
658678
store: Arc::clone(&self.store),
659679
metadata_size_hint,
660-
metrics: parquet_file_metrics,
680+
file_metrics,
661681
}))
662682
}
663683
}
@@ -1178,6 +1198,7 @@ mod tests {
11781198
use crate::datasource::listing::{FileRange, PartitionedFile};
11791199
use crate::datasource::object_store::ObjectStoreUrl;
11801200
use crate::execution::options::CsvReadOptions;
1201+
use crate::physical_plan::metrics::MetricValue;
11811202
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
11821203
use crate::test::object_store::local_unpartitioned_file;
11831204
use crate::{
@@ -1210,23 +1231,46 @@ mod tests {
12101231
use std::io::Write;
12111232
use tempfile::TempDir;
12121233

1213-
/// writes each RecordBatch as an individual parquet file and then
1214-
/// reads it back in to the named location.
1234+
struct RoundTripResult {
1235+
/// Data that was read back from ParquetFiles
1236+
batches: Result<Vec<RecordBatch>>,
1237+
/// The physical plan that was created (that has statistics, etc)
1238+
parquet_exec: Arc<ParquetExec>,
1239+
}
1240+
1241+
/// writes each RecordBatch as an individual parquet file and re-reads
1242+
/// the data back. Returns the data as [RecordBatch]es
12151243
async fn round_trip_to_parquet(
12161244
batches: Vec<RecordBatch>,
12171245
projection: Option<Vec<usize>>,
12181246
schema: Option<SchemaRef>,
12191247
predicate: Option<Expr>,
12201248
pushdown_predicate: bool,
12211249
) -> Result<Vec<RecordBatch>> {
1250+
round_trip(batches, projection, schema, predicate, pushdown_predicate)
1251+
.await
1252+
.batches
1253+
}
1254+
1255+
/// Writes each RecordBatch as an individual parquet file and then
1256+
/// reads them back. Returns the parquet exec as well as the data
1257+
/// as [RecordBatch]es
1258+
async fn round_trip(
1259+
batches: Vec<RecordBatch>,
1260+
projection: Option<Vec<usize>>,
1261+
schema: Option<SchemaRef>,
1262+
predicate: Option<Expr>,
1263+
pushdown_predicate: bool,
1264+
) -> RoundTripResult {
12221265
let file_schema = match schema {
12231266
Some(schema) => schema,
1224-
None => Arc::new(Schema::try_merge(
1225-
batches.iter().map(|b| b.schema().as_ref().clone()),
1226-
)?),
1267+
None => Arc::new(
1268+
Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))
1269+
.unwrap(),
1270+
),
12271271
};
12281272

1229-
let (meta, _files) = store_parquet(batches).await?;
1273+
let (meta, _files) = store_parquet(batches).await.unwrap();
12301274
let file_groups = meta.into_iter().map(Into::into).collect();
12311275

12321276
// prepare the scan
@@ -1253,7 +1297,11 @@ mod tests {
12531297

12541298
let session_ctx = SessionContext::new();
12551299
let task_ctx = session_ctx.task_ctx();
1256-
collect(Arc::new(parquet_exec), task_ctx).await
1300+
let parquet_exec = Arc::new(parquet_exec);
1301+
RoundTripResult {
1302+
batches: collect(parquet_exec.clone(), task_ctx).await,
1303+
parquet_exec,
1304+
}
12571305
}
12581306

12591307
// Add a new column with the specified field name to the RecordBatch
@@ -1464,18 +1512,18 @@ mod tests {
14641512
let filter = col("c2").eq(lit(2_i64));
14651513

14661514
// read/write them files:
1467-
let read =
1468-
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
1469-
.await
1470-
.unwrap();
1515+
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
14711516
let expected = vec![
14721517
"+----+----+----+",
14731518
"| c1 | c3 | c2 |",
14741519
"+----+----+----+",
14751520
"| | 20 | 2 |",
14761521
"+----+----+----+",
14771522
];
1478-
assert_batches_sorted_eq!(expected, &read);
1523+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1524+
let metrics = rt.parquet_exec.metrics().unwrap();
1525+
// Note there are were 6 rows in total (across three batches)
1526+
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
14791527
}
14801528

14811529
#[tokio::test]
@@ -1598,7 +1646,7 @@ mod tests {
15981646
}
15991647

16001648
#[tokio::test]
1601-
async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
1649+
async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
16021650
let c1: ArrayRef =
16031651
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
16041652

@@ -1613,10 +1661,7 @@ mod tests {
16131661
let filter = col("c2").eq(lit(1_i64));
16141662

16151663
// read/write them files:
1616-
let read =
1617-
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
1618-
.await
1619-
.unwrap();
1664+
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
16201665

16211666
let expected = vec![
16221667
"+----+----+",
@@ -1625,7 +1670,10 @@ mod tests {
16251670
"| | 1 |",
16261671
"+----+----+",
16271672
];
1628-
assert_batches_sorted_eq!(expected, &read);
1673+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1674+
let metrics = rt.parquet_exec.metrics().unwrap();
1675+
// Note there are were 6 rows in total (across three batches)
1676+
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
16291677
}
16301678

16311679
#[tokio::test]
@@ -1906,6 +1954,71 @@ mod tests {
19061954
Ok(())
19071955
}
19081956

1957+
#[tokio::test]
1958+
async fn parquet_exec_metrics() {
1959+
let c1: ArrayRef = Arc::new(StringArray::from(vec![
1960+
Some("Foo"),
1961+
None,
1962+
Some("bar"),
1963+
Some("bar"),
1964+
Some("bar"),
1965+
Some("bar"),
1966+
Some("zzz"),
1967+
]));
1968+
1969+
// batch1: c1(string)
1970+
let batch1 = create_batch(vec![("c1", c1.clone())]);
1971+
1972+
// on
1973+
let filter = col("c1").not_eq(lit("bar"));
1974+
1975+
// read/write them files:
1976+
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
1977+
1978+
let metrics = rt.parquet_exec.metrics().unwrap();
1979+
1980+
// assert the batches and some metrics
1981+
let expected = vec![
1982+
"+-----+", "| c1 |", "+-----+", "| Foo |", "| zzz |", "+-----+",
1983+
];
1984+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1985+
1986+
// pushdown predicates have eliminated all 4 bar rows and the
1987+
// null row for 5 rows total
1988+
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
1989+
assert!(
1990+
get_value(&metrics, "pushdown_eval_time") > 0,
1991+
"no eval time in metrics: {:#?}",
1992+
metrics
1993+
);
1994+
}
1995+
1996+
/// returns the sum of all the metrics with the specified name
1997+
/// the returned set.
1998+
///
1999+
/// Count: returns value
2000+
/// Time: returns elapsed nanoseconds
2001+
///
2002+
/// Panics if no such metric.
2003+
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
2004+
let sum = metrics.sum(|m| match m.value() {
2005+
MetricValue::Count { name, .. } if name == metric_name => true,
2006+
MetricValue::Time { name, .. } if name == metric_name => true,
2007+
_ => false,
2008+
});
2009+
2010+
match sum {
2011+
Some(MetricValue::Count { count, .. }) => count.value(),
2012+
Some(MetricValue::Time { time, .. }) => time.value(),
2013+
_ => {
2014+
panic!(
2015+
"Expected metric not found. Looking for '{}' in\n\n{:#?}",
2016+
metric_name, metrics
2017+
);
2018+
}
2019+
}
2020+
}
2021+
19092022
fn parquet_file_metrics() -> ParquetFileMetrics {
19102023
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
19112024
ParquetFileMetrics::new(0, "file.parquet", &metrics)

0 commit comments

Comments
 (0)