Skip to content

Address memory over-accounting in array_agg #16816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ use std::mem::{size_of, size_of_val};
use std::sync::Arc;

use arrow::array::{
make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray,
StructArray,
new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray,
};
use arrow::compute::{filter, SortOptions};
use arrow::datatypes::{DataType, Field, FieldRef, Fields};

use datafusion_common::cast::as_list_array;
use datafusion_common::scalar::copy_array_data;
use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder};
use datafusion_common::{exec_err, internal_err, Result, ScalarValue};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
Expand Down Expand Up @@ -315,11 +313,7 @@ impl Accumulator for ArrayAggAccumulator {
};

if !val.is_empty() {
// The ArrayRef might be holding a reference to its original input buffer, so
// storing it here directly copied/compacted avoids over accounting memory
// not used here.
self.values
.push(make_array(copy_array_data(&val.to_data())));
Comment on lines -319 to -322
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was compacting (as stipulated by this comment) not happening, or not needed?

Copy link
Contributor Author

@gabotechs gabotechs Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change https://github.com/apache/datafusion/pull/16816/files#diff-614f73bd02b93c736f118e06e15d6ebfbb8c570b2f9f21f969659e7a04ab843aL381-R375, not needed.

#16519 introduced compacting in this same aggregate function in other missing places, but we found out that it was very inefficient, so it hasn't been merged. This is the alternative solution that does not rely on copying/compacting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compacting has two aspects

  • accurate memory accounting
  • reducing memory usage

i understand the PR improves accounting.
I just want to understand whether this PR increases memory usage and by how much.
For example, does the memory usage now depend on how array_agg input is calculated? E.g. if it is coming from highly filtered parquet files, will we use more memory now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we will be using more memory. Right now the current code is in an intermediate state where only some of the accumulations in ArrayAggAccumulator are being compacted (#16346, which is merged) and some other don't (#16519, which was not merged due to performance concerns).

So most likely the buffers that bake the accumulated data are still being retained anyways, because #16519 was not merged.

If any, this PR should reduce memory usage, as we are no longer copying data, and improve accounting, as we are not double-counting several times the buffers that bake the accumulated data.

self.values.push(val)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love this one, as the call looks much cheaper now

}

Ok(())
Expand Down Expand Up @@ -378,7 +372,7 @@ impl Accumulator for ArrayAggAccumulator {
+ self
.values
.iter()
.map(|arr| arr.get_array_memory_size())
.map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
.sum::<usize>()
+ self.datatype.size()
- size_of_val(&self.datatype)
Expand Down Expand Up @@ -1008,8 +1002,7 @@ mod tests {
acc2.update_batch(&[data(["b", "c", "a"])])?;
acc1 = merge(acc1, acc2)?;

// without compaction, the size is 2652.
assert_eq!(acc1.size(), 732);
assert_eq!(acc1.size(), 266);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! To convince myself this approach was still accurate enough, I was thinking we should compare this size to the raw data size, which I got from:

        let data1 = data(["a", "c", "b"]);
        let data2 = data(["b", "c", "a"]);
        println!("Size of data: {} {}", data1.get_array_memory_size(), data2.get_array_memory_size());

I see that the data size is 1208 for each. But it's related to excessive capacity of the Array, which I would hope does not happen too much for larger arrays (the ones that actually matter for memory accounting). I would also hope unused memory pages are not effectively allocated by the kernel anyway, so not taking memory in practice (unsure about that in this case).

If we update the data helper to use shrink_to_fit:

    fn data<T, const N: usize>(list: [T; N]) -> ArrayRef
    where
        ScalarValue: From<T>,
    {
        let values: Vec<_> = list.into_iter().map(ScalarValue::from).collect();
        let mut array = ScalarValue::iter_to_array(values).expect("Cannot convert to array");
        array.shrink_to_fit();
        array
    }

Then we get 139 byte per array, which is 278 total, so we are under-accounting by only 12 bytes in practice. Which sounds good.


Ok(())
}
Expand Down