Skip to content

Commit a1645c4

Browse files
authored
Minor: refactor probe check into function should_skip_aggregation (#11821)
1 parent eb2b5fe commit a1645c4

File tree

1 file changed

+19
-14
lines changed

1 file changed

+19
-14
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -635,11 +635,7 @@ impl Stream for GroupedHashAggregateStream {
635635
(
636636
if self.input_done {
637637
ExecutionState::Done
638-
} else if self
639-
.skip_aggregation_probe
640-
.as_ref()
641-
.is_some_and(|probe| probe.should_skip())
642-
{
638+
} else if self.should_skip_aggregation() {
643639
ExecutionState::SkippingAggregation
644640
} else {
645641
ExecutionState::ReadingInput
@@ -955,12 +951,13 @@ impl GroupedHashAggregateStream {
955951
Ok(())
956952
}
957953

958-
// Updates skip aggregation probe state.
959-
// In case stream has any spills, the probe is forcefully set to
960-
// forbid aggregation skipping, and locked, since spilling resets
961-
// total number of unique groups.
962-
//
963-
// Note: currently spilling is not supported for Partial aggregation
954+
/// Updates skip aggregation probe state.
955+
///
956+
/// In case stream has any spills, the probe is forcefully set to
957+
/// forbid aggregation skipping, and locked, since spilling resets
958+
/// total number of unique groups.
959+
///
960+
/// Note: currently spilling is not supported for Partial aggregation
964961
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
965962
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
966963
if !self.spill_state.spills.is_empty() {
@@ -971,8 +968,8 @@ impl GroupedHashAggregateStream {
971968
};
972969
}
973970

974-
// In case the probe indicates that aggregation may be
975-
// skipped, forces stream to produce currently accumulated output.
971+
/// In case the probe indicates that aggregation may be
972+
/// skipped, forces stream to produce currently accumulated output.
976973
fn switch_to_skip_aggregation(&mut self) -> Result<()> {
977974
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
978975
if probe.should_skip() {
@@ -984,7 +981,15 @@ impl GroupedHashAggregateStream {
984981
Ok(())
985982
}
986983

987-
// Transforms input batch to intermediate aggregate state, without grouping it
984+
/// Returns true if the aggregation probe indicates that aggregation
985+
/// should be skipped.
986+
fn should_skip_aggregation(&self) -> bool {
987+
self.skip_aggregation_probe
988+
.as_ref()
989+
.is_some_and(|probe| probe.should_skip())
990+
}
991+
992+
/// Transforms input batch to intermediate aggregate state, without grouping it
988993
fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> {
989994
let group_values = evaluate_group_by(&self.group_by, &batch)?;
990995
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;

0 commit comments

Comments
 (0)