Skip to content

Commit 964a00a

Browse files
alambDandandan
authored andcommitted
Add parquet predicate pushdown metrics (apache#3989)
* Log error building row filters Inspired by @liukun4515 at https://github.com/apache/arrow-datafusion/pull/3380/files#r970198755 * Add parquet predicate pushdown metrics * more efficient bit counting
1 parent c783b9e commit 964a00a

File tree

3 files changed

+227
-40
lines changed

3 files changed

+227
-40
lines changed

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 155 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,10 @@ pub struct ParquetFileMetrics {
237237
pub row_groups_pruned: metrics::Count,
238238
/// Total number of bytes scanned
239239
pub bytes_scanned: metrics::Count,
240+
/// Total rows filtered out by predicates pushed into parquet scan
241+
pub pushdown_rows_filtered: metrics::Count,
242+
/// Total time spent evaluating pushdown filters
243+
pub pushdown_eval_time: metrics::Time,
240244
}
241245

242246
impl ParquetFileMetrics {
@@ -258,10 +262,20 @@ impl ParquetFileMetrics {
258262
.with_new_label("filename", filename.to_string())
259263
.counter("bytes_scanned", partition);
260264

265+
let pushdown_rows_filtered = MetricBuilder::new(metrics)
266+
.with_new_label("filename", filename.to_string())
267+
.counter("pushdown_rows_filtered", partition);
268+
269+
let pushdown_eval_time = MetricBuilder::new(metrics)
270+
.with_new_label("filename", filename.to_string())
271+
.subset_time("pushdown_eval_time", partition);
272+
261273
Self {
262274
predicate_evaluation_errors,
263275
row_groups_pruned,
264276
bytes_scanned,
277+
pushdown_rows_filtered,
278+
pushdown_eval_time,
265279
}
266280
}
267281
}
@@ -410,7 +424,7 @@ impl FileOpener for ParquetOpener {
410424
) -> Result<FileOpenFuture> {
411425
let file_range = file_meta.range.clone();
412426

413-
let metrics = ParquetFileMetrics::new(
427+
let file_metrics = ParquetFileMetrics::new(
414428
self.partition_index,
415429
file_meta.location().as_ref(),
416430
&self.metrics,
@@ -450,21 +464,38 @@ impl FileOpener for ParquetOpener {
450464
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
451465
.flatten()
452466
{
453-
if let Ok(Some(filter)) = build_row_filter(
467+
let row_filter = build_row_filter(
454468
predicate.clone(),
455469
builder.schema().as_ref(),
456470
table_schema.as_ref(),
457471
builder.metadata(),
458472
reorder_predicates,
459-
) {
460-
builder = builder.with_row_filter(filter);
461-
}
473+
&file_metrics.pushdown_rows_filtered,
474+
&file_metrics.pushdown_eval_time,
475+
);
476+
477+
match row_filter {
478+
Ok(Some(filter)) => {
479+
builder = builder.with_row_filter(filter);
480+
}
481+
Ok(None) => {}
482+
Err(e) => {
483+
debug!(
484+
"Ignoring error building row filter for '{:?}': {}",
485+
predicate, e
486+
);
487+
}
488+
};
462489
};
463490

464491
let file_metadata = builder.metadata();
465492
let groups = file_metadata.row_groups();
466-
let row_groups =
467-
prune_row_groups(groups, file_range, pruning_predicate.clone(), &metrics);
493+
let row_groups = prune_row_groups(
494+
groups,
495+
file_range,
496+
pruning_predicate.clone(),
497+
&file_metrics,
498+
);
468499

469500
if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) {
470501
let file_offset_indexes = file_metadata.offset_indexes();
@@ -480,7 +511,7 @@ impl FileOpener for ParquetOpener {
480511
pruning_predicate.clone(),
481512
file_offset_indexes.get(*r),
482513
file_page_indexes.get(*r),
483-
&metrics,
514+
&file_metrics,
484515
)
485516
.map_err(|e| {
486517
ArrowError::ParquetError(format!(
@@ -564,7 +595,7 @@ impl DefaultParquetFileReaderFactory {
564595
struct ParquetFileReader {
565596
store: Arc<dyn ObjectStore>,
566597
meta: ObjectMeta,
567-
metrics: ParquetFileMetrics,
598+
file_metrics: ParquetFileMetrics,
568599
metadata_size_hint: Option<usize>,
569600
}
570601

@@ -573,7 +604,7 @@ impl AsyncFileReader for ParquetFileReader {
573604
&mut self,
574605
range: Range<usize>,
575606
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
576-
self.metrics.bytes_scanned.add(range.end - range.start);
607+
self.file_metrics.bytes_scanned.add(range.end - range.start);
577608

578609
self.store
579610
.get_range(&self.meta.location, range)
@@ -591,7 +622,7 @@ impl AsyncFileReader for ParquetFileReader {
591622
Self: Send,
592623
{
593624
let total = ranges.iter().map(|r| r.end - r.start).sum();
594-
self.metrics.bytes_scanned.add(total);
625+
self.file_metrics.bytes_scanned.add(total);
595626

596627
async move {
597628
self.store
@@ -636,7 +667,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
636667
metadata_size_hint: Option<usize>,
637668
metrics: &ExecutionPlanMetricsSet,
638669
) -> Result<Box<dyn AsyncFileReader + Send>> {
639-
let parquet_file_metrics = ParquetFileMetrics::new(
670+
let file_metrics = ParquetFileMetrics::new(
640671
partition_index,
641672
file_meta.location().as_ref(),
642673
metrics,
@@ -646,7 +677,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
646677
meta: file_meta.object_meta,
647678
store: Arc::clone(&self.store),
648679
metadata_size_hint,
649-
metrics: parquet_file_metrics,
680+
file_metrics,
650681
}))
651682
}
652683
}
@@ -1167,6 +1198,7 @@ mod tests {
11671198
use crate::datasource::listing::{FileRange, PartitionedFile};
11681199
use crate::datasource::object_store::ObjectStoreUrl;
11691200
use crate::execution::options::CsvReadOptions;
1201+
use crate::physical_plan::metrics::MetricValue;
11701202
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
11711203
use crate::test::object_store::local_unpartitioned_file;
11721204
use crate::{
@@ -1199,23 +1231,46 @@ mod tests {
11991231
use std::io::Write;
12001232
use tempfile::TempDir;
12011233

1202-
/// writes each RecordBatch as an individual parquet file and then
1203-
/// reads it back in to the named location.
1234+
struct RoundTripResult {
1235+
/// Data that was read back from ParquetFiles
1236+
batches: Result<Vec<RecordBatch>>,
1237+
/// The physical plan that was created (that has statistics, etc)
1238+
parquet_exec: Arc<ParquetExec>,
1239+
}
1240+
1241+
/// writes each RecordBatch as an individual parquet file and re-reads
1242+
/// the data back. Returns the data as [RecordBatch]es
12041243
async fn round_trip_to_parquet(
12051244
batches: Vec<RecordBatch>,
12061245
projection: Option<Vec<usize>>,
12071246
schema: Option<SchemaRef>,
12081247
predicate: Option<Expr>,
12091248
pushdown_predicate: bool,
12101249
) -> Result<Vec<RecordBatch>> {
1250+
round_trip(batches, projection, schema, predicate, pushdown_predicate)
1251+
.await
1252+
.batches
1253+
}
1254+
1255+
/// Writes each RecordBatch as an individual parquet file and then
1256+
/// reads them back. Returns the parquet exec as well as the data
1257+
/// as [RecordBatch]es
1258+
async fn round_trip(
1259+
batches: Vec<RecordBatch>,
1260+
projection: Option<Vec<usize>>,
1261+
schema: Option<SchemaRef>,
1262+
predicate: Option<Expr>,
1263+
pushdown_predicate: bool,
1264+
) -> RoundTripResult {
12111265
let file_schema = match schema {
12121266
Some(schema) => schema,
1213-
None => Arc::new(Schema::try_merge(
1214-
batches.iter().map(|b| b.schema().as_ref().clone()),
1215-
)?),
1267+
None => Arc::new(
1268+
Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))
1269+
.unwrap(),
1270+
),
12161271
};
12171272

1218-
let (meta, _files) = store_parquet(batches).await?;
1273+
let (meta, _files) = store_parquet(batches).await.unwrap();
12191274
let file_groups = meta.into_iter().map(Into::into).collect();
12201275

12211276
// prepare the scan
@@ -1242,7 +1297,11 @@ mod tests {
12421297

12431298
let session_ctx = SessionContext::new();
12441299
let task_ctx = session_ctx.task_ctx();
1245-
collect(Arc::new(parquet_exec), task_ctx).await
1300+
let parquet_exec = Arc::new(parquet_exec);
1301+
RoundTripResult {
1302+
batches: collect(parquet_exec.clone(), task_ctx).await,
1303+
parquet_exec,
1304+
}
12461305
}
12471306

12481307
// Add a new column with the specified field name to the RecordBatch
@@ -1453,18 +1512,18 @@ mod tests {
14531512
let filter = col("c2").eq(lit(2_i64));
14541513

14551514
// read/write them files:
1456-
let read =
1457-
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
1458-
.await
1459-
.unwrap();
1515+
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
14601516
let expected = vec![
14611517
"+----+----+----+",
14621518
"| c1 | c3 | c2 |",
14631519
"+----+----+----+",
14641520
"| | 20 | 2 |",
14651521
"+----+----+----+",
14661522
];
1467-
assert_batches_sorted_eq!(expected, &read);
1523+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1524+
let metrics = rt.parquet_exec.metrics().unwrap();
1525+
// Note there are were 6 rows in total (across three batches)
1526+
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
14681527
}
14691528

14701529
#[tokio::test]
@@ -1587,7 +1646,7 @@ mod tests {
15871646
}
15881647

15891648
#[tokio::test]
1590-
async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
1649+
async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
15911650
let c1: ArrayRef =
15921651
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
15931652

@@ -1602,10 +1661,7 @@ mod tests {
16021661
let filter = col("c2").eq(lit(1_i64));
16031662

16041663
// read/write them files:
1605-
let read =
1606-
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
1607-
.await
1608-
.unwrap();
1664+
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
16091665

16101666
let expected = vec![
16111667
"+----+----+",
@@ -1614,7 +1670,10 @@ mod tests {
16141670
"| | 1 |",
16151671
"+----+----+",
16161672
];
1617-
assert_batches_sorted_eq!(expected, &read);
1673+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1674+
let metrics = rt.parquet_exec.metrics().unwrap();
1675+
// Note there are were 6 rows in total (across three batches)
1676+
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
16181677
}
16191678

16201679
#[tokio::test]
@@ -1895,6 +1954,71 @@ mod tests {
18951954
Ok(())
18961955
}
18971956

1957+
#[tokio::test]
1958+
async fn parquet_exec_metrics() {
1959+
let c1: ArrayRef = Arc::new(StringArray::from(vec![
1960+
Some("Foo"),
1961+
None,
1962+
Some("bar"),
1963+
Some("bar"),
1964+
Some("bar"),
1965+
Some("bar"),
1966+
Some("zzz"),
1967+
]));
1968+
1969+
// batch1: c1(string)
1970+
let batch1 = create_batch(vec![("c1", c1.clone())]);
1971+
1972+
// on
1973+
let filter = col("c1").not_eq(lit("bar"));
1974+
1975+
// read/write them files:
1976+
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
1977+
1978+
let metrics = rt.parquet_exec.metrics().unwrap();
1979+
1980+
// assert the batches and some metrics
1981+
let expected = vec![
1982+
"+-----+", "| c1 |", "+-----+", "| Foo |", "| zzz |", "+-----+",
1983+
];
1984+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1985+
1986+
// pushdown predicates have eliminated all 4 bar rows and the
1987+
// null row for 5 rows total
1988+
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
1989+
assert!(
1990+
get_value(&metrics, "pushdown_eval_time") > 0,
1991+
"no eval time in metrics: {:#?}",
1992+
metrics
1993+
);
1994+
}
1995+
1996+
/// returns the sum of all the metrics with the specified name
1997+
/// the returned set.
1998+
///
1999+
/// Count: returns value
2000+
/// Time: returns elapsed nanoseconds
2001+
///
2002+
/// Panics if no such metric.
2003+
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
2004+
let sum = metrics.sum(|m| match m.value() {
2005+
MetricValue::Count { name, .. } if name == metric_name => true,
2006+
MetricValue::Time { name, .. } if name == metric_name => true,
2007+
_ => false,
2008+
});
2009+
2010+
match sum {
2011+
Some(MetricValue::Count { count, .. }) => count.value(),
2012+
Some(MetricValue::Time { time, .. }) => time.value(),
2013+
_ => {
2014+
panic!(
2015+
"Expected metric not found. Looking for '{}' in\n\n{:#?}",
2016+
metric_name, metrics
2017+
);
2018+
}
2019+
}
2020+
}
2021+
18982022
fn parquet_file_metrics() -> ParquetFileMetrics {
18992023
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
19002024
ParquetFileMetrics::new(0, "file.parquet", &metrics)

0 commit comments

Comments
 (0)