Skip to content

Commit 679a85f

Browse files
authored
Add metrics for skipped rows (#11706)
1 parent 1ecdf90 commit 679a85f

File tree

1 file changed

+34
-13
lines changed

1 file changed

+34
-13
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ use crate::aggregates::{
2828
PhysicalGroupBy,
2929
};
3030
use crate::common::IPCWriter;
31-
use crate::metrics::{BaselineMetrics, RecordOutput};
31+
use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput};
3232
use crate::sorts::sort::sort_batch;
3333
use crate::sorts::streaming_merge;
3434
use crate::spill::read_spill_as_stream;
3535
use crate::stream::RecordBatchStreamAdapter;
36-
use crate::{aggregates, ExecutionPlan, PhysicalExpr};
36+
use crate::{aggregates, metrics, ExecutionPlan, PhysicalExpr};
3737
use crate::{RecordBatchStream, SendableRecordBatchStream};
3838

3939
use arrow::array::*;
@@ -117,17 +117,30 @@ struct SkipAggregationProbe {
117117
/// Flag indicating that further updates of `SkipAggregationProbe`
118118
/// state won't make any effect
119119
is_locked: bool,
120+
121+
/// Number of rows where state was output without aggregation.
122+
///
123+
/// * If 0, all input rows were aggregated (should_skip was always false)
124+
///
125+
/// * if greater than zero, the number of rows which were output directly
126+
/// without aggregation
127+
skipped_aggregation_rows: metrics::Count,
120128
}
121129

122130
impl SkipAggregationProbe {
123-
fn new(probe_rows_threshold: usize, probe_ratio_threshold: f64) -> Self {
131+
fn new(
132+
probe_rows_threshold: usize,
133+
probe_ratio_threshold: f64,
134+
skipped_aggregation_rows: metrics::Count,
135+
) -> Self {
124136
Self {
125137
input_rows: 0,
126138
num_groups: 0,
127139
probe_rows_threshold,
128140
probe_ratio_threshold,
129141
should_skip: false,
130142
is_locked: false,
143+
skipped_aggregation_rows,
131144
}
132145
}
133146

@@ -160,6 +173,11 @@ impl SkipAggregationProbe {
160173
self.should_skip = false;
161174
self.is_locked = true;
162175
}
176+
177+
/// Record the number of rows that were output directly without aggregation
178+
fn record_skipped(&mut self, batch: &RecordBatch) {
179+
self.skipped_aggregation_rows.add(batch.num_rows());
180+
}
163181
}
164182

165183
/// HashTable based Grouping Aggregator
@@ -473,17 +491,17 @@ impl GroupedHashAggregateStream {
473491
.all(|acc| acc.supports_convert_to_state())
474492
&& agg_group_by.is_single()
475493
{
494+
let options = &context.session_config().options().execution;
495+
let probe_rows_threshold =
496+
options.skip_partial_aggregation_probe_rows_threshold;
497+
let probe_ratio_threshold =
498+
options.skip_partial_aggregation_probe_ratio_threshold;
499+
let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics)
500+
.counter("skipped_aggregation_rows", partition);
476501
Some(SkipAggregationProbe::new(
477-
context
478-
.session_config()
479-
.options()
480-
.execution
481-
.skip_partial_aggregation_probe_rows_threshold,
482-
context
483-
.session_config()
484-
.options()
485-
.execution
486-
.skip_partial_aggregation_probe_ratio_threshold,
502+
probe_rows_threshold,
503+
probe_ratio_threshold,
504+
skipped_aggregation_rows,
487505
))
488506
} else {
489507
None
@@ -611,6 +629,9 @@ impl Stream for GroupedHashAggregateStream {
611629
match ready!(self.input.poll_next_unpin(cx)) {
612630
Some(Ok(batch)) => {
613631
let _timer = elapsed_compute.timer();
632+
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
633+
probe.record_skipped(&batch);
634+
}
614635
let states = self.transform_to_states(batch)?;
615636
return Poll::Ready(Some(Ok(
616637
states.record_output(&self.baseline_metrics)

0 commit comments

Comments
 (0)