Skip to content

Comments

Clamp early aggregation emit to the sort boundary when using partial group ordering#20446

Open
jackkleeman wants to merge 2 commits intoapache:mainfrom
restatedev:issue-20445
Open

Clamp early aggregation emit to the sort boundary when using partial group ordering#20446
jackkleeman wants to merge 2 commits intoapache:mainfrom
restatedev:issue-20445

Conversation

@jackkleeman
Copy link
Contributor

Which issue does this PR close?

What changes are included in this PR?

Fix a panic on early emit with partial sort aggregations, by clamping our emit point to the sort boundary

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Feb 20, 2026
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jackkleeman

I spent quite a while with this PR this afternoon and I think it looks correct to me

However, I found the current formulation somewhat confusing, and I have offered a potential way to make it clearer (refactoring into a method). I don't think this is required before merge, I can make a follow on PR if you prefer

if let Some(batch) = self.emit(EmitTo::First(n), false)? {
// Clamp to the sort boundary when using partial group ordering,
// otherwise remove_groups panics (#20445).
let n = match &self.group_ordering {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this logic quite confusing and the use of 0 as a sentiel also zero. Could we maybe encapsulate it into a method? That way we could at least explain what it is doing better.

I tried a bunch of different forumations, and the best I could come up with was

impl GroupedHashAggregateStream {
...
     // Clamp to the sort boundary when using partial group ordering,
                // otherwise remove_groups panics (#20445).
                if let Some(emit_to) = self.emit_target_for_oom() {
                    if let Some(batch) = self.emit(EmitTo::First(n), false)?
                    {
                        return Ok(Some(ExecutionState::ProducingOutput(batch)))
                    }
                }
...

    /// Returns how many groups to try and emit in order to avoid an out-of-memory
    /// condition.
    ///
    /// Returns `None` if emitting is not possible.
    ///
    /// Returns Some(EmitTo) with the number of groups to emit if it is possible
    /// to emit some groups to free memory
    fn emit_target_for_oom(&self) -> Option<EmitTo> {
        let n = if self.group_values.len() >= self.batch_size {
            // Try to emit an integer multiple of batch size if possible
            self.group_values.len() / self.batch_size * self.batch_size
        } else {
            // Otherwise emit whatever we can
            self.group_values.len()
        };

        // Special case for GroupOrdering::None since emit_to() returns None for
        // that case, but we can still emit some groups to try to resolve the OOM
         if matches!(&self.group_ordering, GroupOrdering::None) {
             return Some(EmitTo::First(n));
         };

        self.group_ordering.emit_to()
            .map(|emit_to| match emit_to {
                // If the ordering allows emitting some groups,
                // emit as many as we can to try to resolve the OOM,
                EmitTo::First(max)=> EmitTo::First(n.min(max)),
                // if the ordering allows emitting all groups, we can emit n
                // groups to try to resolve the OOM
                EmitTo::All => EmitTo::First(n),
            })
    }

...
}

Here is the entire proposed diff

index 35f32ac7a..5bd33aab5 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1038,26 +1038,50 @@ impl GroupedHashAggregateStream {

                 // Clamp to the sort boundary when using partial group ordering,
                 // otherwise remove_groups panics (#20445).
-                let n = match &self.group_ordering {
-                    GroupOrdering::None => n,
-                    _ => match self.group_ordering.emit_to() {
-                        Some(EmitTo::First(max)) => n.min(max),
-                        _ => 0,
-                    },
-                };
-
-                if n > 0
-                    && let Some(batch) = self.emit(EmitTo::First(n), false)?
-                {
-                    Ok(Some(ExecutionState::ProducingOutput(batch)))
-                } else {
-                    Err(oom)
+                if let Some(emit_to) = self.emit_target_for_oom() {
+                    if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+                        return Ok(Some(ExecutionState::ProducingOutput(batch)));
+                    }
                 }
+
+                Err(oom)
             }
             _ => Err(oom),
         }
     }

+    /// Returns how many groups to try and emit in order to avoid an out-of-memory
+    /// condition.
+    ///
+    /// Returns `None` if emitting is not possible.
+    ///
+    /// Returns Some(EmitTo) with the number of groups to emit if it is possible
+    /// to emit some groups to free memory
+    fn emit_target_for_oom(&self) -> Option<EmitTo> {
+        let n = if self.group_values.len() >= self.batch_size {
+            // Try to emit an integer multiple of batch size if possible
+            self.group_values.len() / self.batch_size * self.batch_size
+        } else {
+            // Otherwise emit whatever we can
+            self.group_values.len()
+        };
+
+        // Special case for GroupOrdering::None since emit_to() returns None for
+        // that case, but we can still emit some groups to try to resolve the OOM
+        if matches!(&self.group_ordering, GroupOrdering::None) {
+            return Some(EmitTo::First(n));
+        };
+
+        self.group_ordering.emit_to().map(|emit_to| match emit_to {
+            // If the ordering allows emitting some groups,
+            // emit as many as we can to try to resolve the OOM,
+            EmitTo::First(max) => EmitTo::First(n.min(max)),
+            // if the ordering allows emitting all groups, we can emit n
+            // groups to try to resolve the OOM
+            EmitTo::All => EmitTo::First(n),
+        })
+    }
+

GroupOrdering::None => n,
_ => match self.group_ordering.emit_to() {
Some(EmitTo::First(max)) => n.min(max),
_ => 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not change this to emit n groups on Some(EmitTo::All)?

             _ => match self.group_ordering.emit_to() {
                        Some(EmitTo::First(max)) => n.min(max),
                        Some(EmitTo::All) => n,
                        _ => 0,

As I understand what it is doing, we are trying to ensure at most n groups are emitted
(this is incorporated above as well)

@alamb
Copy link
Contributor

alamb commented Feb 23, 2026

FYI @2010YOUY01 as I think you have also worked on this part of the code as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [v52 regression] Panic in GroupOrderingPartial::remove_groups when Partial aggregate with PartiallySorted hits memory pressure

2 participants