Skip to content

Commit a3db191

Browse files
authored
Don't store hashes in GroupOrdering (#7029)
* Don't store hashes in GroupOrdering * Update group IDs * Review feedback
1 parent 44008d7 commit a3db191

File tree

4 files changed

+26
-64
lines changed

4 files changed

+26
-64
lines changed

datafusion/core/src/physical_plan/aggregates/order/full.rs

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use datafusion_execution::memory_pool::proxy::VecAllocExt;
19-
2018
use crate::physical_expr::EmitTo;
2119

2220
/// Tracks grouping state when the data is ordered entirely by its
@@ -58,8 +56,6 @@ use crate::physical_expr::EmitTo;
5856
#[derive(Debug)]
5957
pub(crate) struct GroupOrderingFull {
6058
state: State,
61-
/// Hash values for groups in 0..current
62-
hashes: Vec<u64>,
6359
}
6460

6561
#[derive(Debug)]
@@ -79,7 +75,6 @@ impl GroupOrderingFull {
7975
pub fn new() -> Self {
8076
Self {
8177
state: State::Start,
82-
hashes: vec![],
8378
}
8479
}
8580

@@ -101,19 +96,17 @@ impl GroupOrderingFull {
10196
}
10297

10398
/// remove the first n groups from the internal state, shifting
104-
/// all existing indexes down by `n`. Returns stored hash values
105-
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
99+
/// all existing indexes down by `n`
100+
pub fn remove_groups(&mut self, n: usize) {
106101
match &mut self.state {
107102
State::Start => panic!("invalid state: start"),
108103
State::InProgress { current } => {
109104
// shift down by n
110105
assert!(*current >= n);
111106
*current -= n;
112-
self.hashes.drain(0..n);
113107
}
114108
State::Complete { .. } => panic!("invalid state: complete"),
115-
};
116-
&self.hashes
109+
}
117110
}
118111

119112
/// Note that the input is complete so any outstanding groups are done as well
@@ -123,20 +116,8 @@ impl GroupOrderingFull {
123116

124117
/// Called when new groups are added in a batch. See documentation
125118
/// on [`super::GroupOrdering::new_groups`]
126-
pub fn new_groups(
127-
&mut self,
128-
group_indices: &[usize],
129-
batch_hashes: &[u64],
130-
total_num_groups: usize,
131-
) {
119+
pub fn new_groups(&mut self, total_num_groups: usize) {
132120
assert_ne!(total_num_groups, 0);
133-
assert_eq!(group_indices.len(), batch_hashes.len());
134-
135-
// copy any hash values
136-
self.hashes.resize(total_num_groups, 0);
137-
for (&group_index, &hash) in group_indices.iter().zip(batch_hashes.iter()) {
138-
self.hashes[group_index] = hash;
139-
}
140121

141122
// Update state
142123
let max_group_index = total_num_groups - 1;
@@ -158,6 +139,6 @@ impl GroupOrderingFull {
158139
}
159140

160141
pub(crate) fn size(&self) -> usize {
161-
std::mem::size_of::<Self>() + self.hashes.allocated_size()
142+
std::mem::size_of::<Self>()
162143
}
163144
}

datafusion/core/src/physical_plan/aggregates/order/mod.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ impl GroupOrdering {
8484

8585
/// remove the first n groups from the internal state, shifting
8686
/// all existing indexes down by `n`. Returns stored hash values
87-
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
87+
pub fn remove_groups(&mut self, n: usize) {
8888
match self {
89-
GroupOrdering::None => &[],
89+
GroupOrdering::None => {}
9090
GroupOrdering::Partial(partial) => partial.remove_groups(n),
9191
GroupOrdering::Full(full) => full.remove_groups(n),
9292
}
@@ -106,7 +106,6 @@ impl GroupOrdering {
106106
&mut self,
107107
batch_group_values: &[ArrayRef],
108108
group_indices: &[usize],
109-
batch_hashes: &[u64],
110109
total_num_groups: usize,
111110
) -> Result<()> {
112111
match self {
@@ -115,13 +114,11 @@ impl GroupOrdering {
115114
partial.new_groups(
116115
batch_group_values,
117116
group_indices,
118-
batch_hashes,
119117
total_num_groups,
120118
)?;
121119
}
122-
123120
GroupOrdering::Full(full) => {
124-
full.new_groups(group_indices, batch_hashes, total_num_groups);
121+
full.new_groups(total_num_groups);
125122
}
126123
};
127124
Ok(())

datafusion/core/src/physical_plan/aggregates/order/partial.rs

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ pub(crate) struct GroupOrderingPartial {
7171
/// Converter for the sort key (used on the group columns
7272
/// specified in `order_indexes`)
7373
row_converter: RowConverter,
74-
75-
/// Hash values for groups in 0..completed
76-
hashes: Vec<u64>,
7774
}
7875

7976
#[derive(Debug, Default)]
@@ -127,7 +124,6 @@ impl GroupOrderingPartial {
127124
state: State::Start,
128125
order_indices: order_indices.to_vec(),
129126
row_converter: RowConverter::new(fields)?,
130-
hashes: vec![],
131127
})
132128
}
133129

@@ -167,8 +163,8 @@ impl GroupOrderingPartial {
167163
}
168164

169165
/// remove the first n groups from the internal state, shifting
170-
/// all existing indexes down by `n`. Returns stored hash values
171-
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
166+
/// all existing indexes down by `n`
167+
pub fn remove_groups(&mut self, n: usize) {
172168
match &mut self.state {
173169
State::Taken => unreachable!("State previously taken"),
174170
State::Start => panic!("invalid state: start"),
@@ -182,12 +178,9 @@ impl GroupOrderingPartial {
182178
*current -= n;
183179
assert!(*current_sort >= n);
184180
*current_sort -= n;
185-
// Note sort_key stays the same, we are just translating group indexes
186-
self.hashes.drain(0..n);
187181
}
188182
State::Complete { .. } => panic!("invalid state: complete"),
189-
};
190-
&self.hashes
183+
}
191184
}
192185

193186
/// Note that the input is complete so any outstanding groups are done as well
@@ -204,18 +197,15 @@ impl GroupOrderingPartial {
204197
&mut self,
205198
batch_group_values: &[ArrayRef],
206199
group_indices: &[usize],
207-
batch_hashes: &[u64],
208200
total_num_groups: usize,
209201
) -> Result<()> {
210202
assert!(total_num_groups > 0);
211203
assert!(!batch_group_values.is_empty());
212-
assert_eq!(group_indices.len(), batch_hashes.len());
213204

214205
let max_group_index = total_num_groups - 1;
215206

216207
// compute the sort key values for each group
217208
let sort_keys = self.compute_sort_keys(batch_group_values)?;
218-
assert_eq!(sort_keys.num_rows(), batch_hashes.len());
219209

220210
let old_state = std::mem::take(&mut self.state);
221211
let (mut current_sort, mut sort_key) = match &old_state {
@@ -231,16 +221,9 @@ impl GroupOrderingPartial {
231221
}
232222
};
233223

234-
// copy any hash values, and find latest sort key
235-
self.hashes.resize(total_num_groups, 0);
236-
let iter = group_indices
237-
.iter()
238-
.zip(batch_hashes.iter())
239-
.zip(sort_keys.iter());
240-
241-
for ((&group_index, &hash), group_sort_key) in iter {
242-
self.hashes[group_index] = hash;
243-
224+
// Find latest sort key
225+
let iter = group_indices.iter().zip(sort_keys.iter());
226+
for (&group_index, group_sort_key) in iter {
244227
// Does this group have seen a new sort_key?
245228
if sort_key != group_sort_key {
246229
current_sort = group_index;
@@ -262,6 +245,5 @@ impl GroupOrderingPartial {
262245
std::mem::size_of::<Self>()
263246
+ self.order_indices.allocated_size()
264247
+ self.row_converter.size()
265-
+ self.hashes.allocated_size()
266248
}
267249
}

datafusion/core/src/physical_plan/aggregates/row_hash.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,6 @@ impl GroupedHashAggregateStream {
485485
self.group_ordering.new_groups(
486486
group_values,
487487
group_indices,
488-
batch_hashes,
489488
total_num_groups,
490489
)?;
491490
}
@@ -624,15 +623,18 @@ impl GroupedHashAggregateStream {
624623
}
625624
std::mem::swap(&mut new_group_values, &mut self.group_values);
626625

627-
// rebuild hash table (maybe we should remove the
628-
// entries for each group that was emitted rather than
629-
// rebuilding the whole thing
630-
631-
let hashes = self.group_ordering.remove_groups(n);
632-
assert_eq!(hashes.len(), self.group_values.num_rows());
633-
self.map.clear();
634-
for (idx, &hash) in hashes.iter().enumerate() {
635-
self.map.insert(hash, (hash, idx), |(hash, _)| *hash);
626+
self.group_ordering.remove_groups(n);
627+
// SAFETY: self.map outlives iterator and is not modified concurrently
628+
unsafe {
629+
for bucket in self.map.iter() {
630+
// Decrement group index by n
631+
match bucket.as_ref().1.checked_sub(n) {
632+
// Group index was >= n, shift value down
633+
Some(sub) => bucket.as_mut().1 = sub,
634+
// Group index was < n, so remove from table
635+
None => self.map.erase(bucket),
636+
}
637+
}
636638
}
637639
}
638640
};

0 commit comments

Comments
 (0)