Skip to content

Commit 263222e

Browse files
authored
feat: Add unique id for every memory consumer (#15613)
* feat: Add unique id for every memory consumer * imports * feat: Make MemoryConsumer non-clonable, update tracked pool to use ids * Add hash impl and fmt * use a single cfg instead wrapping the if * Address review, add documentation
1 parent 6380556 commit 263222e

File tree

2 files changed

+153
-69
lines changed

2 files changed

+153
-69
lines changed

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
//! help with allocation accounting.
2020
2121
use datafusion_common::{internal_err, Result};
22-
use std::{cmp::Ordering, sync::Arc};
22+
use std::hash::{Hash, Hasher};
23+
use std::{cmp::Ordering, sync::atomic, sync::Arc};
2324

2425
mod pool;
2526
pub mod proxy {
@@ -146,24 +147,76 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
146147
/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
147148
/// a particular `MemoryConsumer`;
148149
///
150+
/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
151+
/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
152+
/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
153+
/// and is only guaranteed to share the name and inner properties.
154+
///
149155
/// For help with allocation accounting, see the [`proxy`] module.
150156
///
151157
/// [proxy]: datafusion_common::utils::proxy
152-
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
158+
#[derive(Debug)]
153159
pub struct MemoryConsumer {
154160
name: String,
155161
can_spill: bool,
162+
id: usize,
163+
}
164+
165+
impl PartialEq for MemoryConsumer {
166+
fn eq(&self, other: &Self) -> bool {
167+
let is_same_id = self.id == other.id;
168+
169+
#[cfg(debug_assertions)]
170+
if is_same_id {
171+
assert_eq!(self.name, other.name);
172+
assert_eq!(self.can_spill, other.can_spill);
173+
}
174+
175+
is_same_id
176+
}
177+
}
178+
179+
impl Eq for MemoryConsumer {}
180+
181+
impl Hash for MemoryConsumer {
182+
fn hash<H: Hasher>(&self, state: &mut H) {
183+
self.id.hash(state);
184+
self.name.hash(state);
185+
self.can_spill.hash(state);
186+
}
156187
}
157188

158189
impl MemoryConsumer {
190+
fn new_unique_id() -> usize {
191+
static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
192+
ID.fetch_add(1, atomic::Ordering::Relaxed)
193+
}
194+
159195
/// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
160196
pub fn new(name: impl Into<String>) -> Self {
161197
Self {
162198
name: name.into(),
163199
can_spill: false,
200+
id: Self::new_unique_id(),
201+
}
202+
}
203+
204+
/// Returns a clone of this [`MemoryConsumer`] with a new unique id,
205+
/// which can be registered with a [`MemoryPool`],
206+
/// This new consumer is separate from the original.
207+
pub fn clone_with_new_id(&self) -> Self {
208+
Self {
209+
name: self.name.clone(),
210+
can_spill: self.can_spill,
211+
id: Self::new_unique_id(),
164212
}
165213
}
166214

215+
/// Return the unique id of this [`MemoryConsumer`]
216+
pub fn id(&self) -> usize {
217+
self.id
218+
}
219+
167220
/// Set whether this allocation can be spilled to disk
168221
pub fn with_can_spill(self, can_spill: bool) -> Self {
169222
Self { can_spill, ..self }
@@ -349,7 +402,7 @@ pub mod units {
349402
pub const KB: u64 = 1 << 10;
350403
}
351404

352-
/// Present size in human readable form
405+
/// Present size in human-readable form
353406
pub fn human_readable_size(size: usize) -> String {
354407
use units::*;
355408

@@ -374,6 +427,15 @@ pub fn human_readable_size(size: usize) -> String {
374427
mod tests {
375428
use super::*;
376429

430+
#[test]
431+
fn test_id_uniqueness() {
432+
let mut ids = std::collections::HashSet::new();
433+
for _ in 0..100 {
434+
let consumer = MemoryConsumer::new("test");
435+
assert!(ids.insert(consumer.id())); // Ensures unique insertion
436+
}
437+
}
438+
377439
#[test]
378440
fn test_memory_pool_underflow() {
379441
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;

0 commit comments

Comments
 (0)