Skip to content

Address memory over-accounting in array_agg #16816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

gabotechs
Copy link
Contributor

@gabotechs gabotechs commented Jul 18, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

Follow up on:

The ArrayAggAccumulator, unlike DistinctArrayAggAccumulator or OrderSensitiveArrayAggAccumulator or many other accumulators, accumulates the values by directly referencing to the source ArrayRefs rather than making a ScalarValue out of it. This is good for performance as we can afford to just keep references to the original buffers, but it has the drawback that it makes complicated measuring the memory consumed by the ArrayAggAccumulator.

When calling ArrayRef::get_array_memory_size() we get the memory occupied by the whole underlaying buffer, but it's not technically true that ArrayAggAccumulator is occupying all that space.

I found this other ArrayData::get_slice_memory_size() method with the following docs:

Returns the total number of the bytes of memory occupied by the buffers by this slice of ArrayData 
(See also diagram on ArrayData).

This is approximately the number of bytes if a new ArrayData was formed by creating new Buffers
 with exactly the data needed.

For example, a DataType::Int64 with 100 elements, Self::get_slice_memory_size would return 100 * 8 = 800.
If the ArrayData was then Self::sliceed to refer to its first 20 elements, then Self::get_slice_memory_size
on the sliced ArrayData would return 20 * 8 = 160.

Which leads me to think that if we are accumulating ArrayRefs, rather than compacting them by copying their data, it might be better to just not do it and report the consumed memory by calling get_slice_memory_size(). I think it's still not technically true that ArrayAggAccumulator is occupying that space, as it's memory not owned by ArrayAggAccumulator, it's just referenced by it but owned by someone else, but it's the closest thing to reality that I was able to come up with.

What changes are included in this PR?

Stops copying ArrayRef data in ArrayAggAccumulator and starts measuring its occupied size with get_slice_memory_size()

Are these changes tested?

yes, by existing tests

Are there any user-facing changes?

People should stop seeing ResourceExhausted errors in aggregations that imply ArrayAgg

@github-actions github-actions bot added the functions Changes to functions implementation label Jul 18, 2025
@gabotechs gabotechs force-pushed the fix-size-measurement-for-array-agg-accumulator branch from c90401b to ac407a1 Compare July 18, 2025 08:50
Copy link
Contributor

@fmonjalet fmonjalet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach. It's a case where the under accounting seems more manageable than a vast over accounting, and performance should be better than copying too.

@@ -1008,8 +1002,7 @@ mod tests {
acc2.update_batch(&[data(["b", "c", "a"])])?;
acc1 = merge(acc1, acc2)?;

// without compaction, the size is 2652.
assert_eq!(acc1.size(), 732);
assert_eq!(acc1.size(), 266);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! To convince myself this approach was still accurate enough, I was thinking we should compare this size to the raw data size, which I got from:

        let data1 = data(["a", "c", "b"]);
        let data2 = data(["b", "c", "a"]);
        println!("Size of data: {} {}", data1.get_array_memory_size(), data2.get_array_memory_size());

I see that the data size is 1208 for each. But it's related to excessive capacity of the Array, which I would hope does not happen too much for larger arrays (the ones that actually matter for memory accounting). I would also hope unused memory pages are not effectively allocated by the kernel anyway, so not taking memory in practice (unsure about that in this case).

If we update the data helper to use shrink_to_fit:

    fn data<T, const N: usize>(list: [T; N]) -> ArrayRef
    where
        ScalarValue: From<T>,
    {
        let values: Vec<_> = list.into_iter().map(ScalarValue::from).collect();
        let mut array = ScalarValue::iter_to_array(values).expect("Cannot convert to array");
        array.shrink_to_fit();
        array
    }

Then we get 139 byte per array, which is 278 total, so we are under-accounting by only 12 bytes in practice. Which sounds good.

// not used here.
self.values
.push(make_array(copy_array_data(&val.to_data())));
self.values.push(val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love this one, as the call looks much cheaper now

Comment on lines -319 to -322
// storing it here directly copied/compacted avoids over accounting memory
// not used here.
self.values
.push(make_array(copy_array_data(&val.to_data())));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was compacting (as stipulated by this comment) not happening, or not needed?

Copy link
Contributor Author

@gabotechs gabotechs Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change https://github.com/apache/datafusion/pull/16816/files#diff-614f73bd02b93c736f118e06e15d6ebfbb8c570b2f9f21f969659e7a04ab843aL381-R375, not needed.

#16519 introduced compacting in this same aggregate function in other missing places, but we found out that it was very inefficient, so it hasn't been merged. This is the alternative solution that does not rely on copying/compacting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compacting has two aspects

  • accurate memory accounting
  • reducing memory usage

i understand the PR improves accounting.
I just want to understand whether this PR increases memory usage and by how much.
For example, does the memory usage now depend on how array_agg input is calculated? E.g. if it is coming from highly filtered parquet files, will we use more memory now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we will be using more memory. Right now the current code is in an intermediate state where only some of the accumulations in ArrayAggAccumulator are being compacted (#16346, which is merged) and some other don't (#16519, which was not merged due to performance concerns).

So most likely the buffers that bake the accumulated data are still being retained anyways, because #16519 was not merged.

If any, this PR should reduce memory usage, as we are no longer copying data, and improve accounting, as we are not double-counting several times the buffers that bake the accumulated data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
functions Changes to functions implementation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants