Skip to content

Add tokio runtime memory tracker #2356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(thread_local)]

#[cfg(test)]
mod runtime_test;

Expand All @@ -24,9 +26,11 @@ mod stoppable_test;
mod profiling;
mod progress;
mod runtime;
mod runtime_tracker;
mod shutdown_signal;
mod stop_handle;
mod stoppable;
mod thread;
mod uniq_id;

pub use profiling::Profiling;
Expand All @@ -37,11 +41,14 @@ pub use runtime::BlockingWait;
pub use runtime::Dropper;
pub use runtime::Runtime;
pub use runtime::TrySpawn;
pub use runtime_tracker::RuntimeTracker;
pub use runtime_tracker::ThreadTracker;
pub use shutdown_signal::signal_stream;
pub use shutdown_signal::SignalStream;
pub use shutdown_signal::SignalType;
pub use stop_handle::StopHandle;
pub use stoppable::Stoppable;
pub use thread::Thread;
pub use tokio;
pub use uniq_id::GlobalSequence;
pub use uniq_id::GlobalUniqName;
Expand Down
33 changes: 26 additions & 7 deletions common/base/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

use crate::runtime_tracker::RuntimeTracker;

/// Methods to spawn tasks.
pub trait TrySpawn {
/// Tries to spawn a new asynchronous task, returning a tokio::JoinHandle for it.
Expand Down Expand Up @@ -143,12 +145,14 @@ where
pub struct Runtime {
// Handle to runtime.
handle: Handle,
// Runtime tracker
tracker: Arc<RuntimeTracker>,
// Use to receive a drop signal when dropper is dropped.
_dropper: Dropper,
}

impl Runtime {
fn create(builder: &mut tokio::runtime::Builder) -> Result<Self> {
fn create(tracker: Arc<RuntimeTracker>, builder: &mut tokio::runtime::Builder) -> Result<Self> {
let runtime = builder
.build()
.map_err(|tokio_error| ErrorCode::TokioError(format!("{}", tokio_error)))?;
Expand All @@ -162,25 +166,40 @@ impl Runtime {

Ok(Runtime {
handle,
tracker,
_dropper: Dropper {
close: Some(send_stop),
},
})
}

fn tracker_builder(rt_tracker: Arc<RuntimeTracker>) -> tokio::runtime::Builder {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder
.enable_all()
.on_thread_stop(rt_tracker.on_stop_thread())
.on_thread_start(rt_tracker.on_start_thread());

builder
}

pub fn get_tracker(&self) -> Arc<RuntimeTracker> {
self.tracker.clone()
}

/// Spawns a new tokio runtime with a default thread count on a background
/// thread and returns a `Handle` which can be used to spawn tasks via
/// its executor.
pub fn with_default_worker_threads() -> Result<Self> {
let mut runtime = tokio::runtime::Builder::new_multi_thread();
let builder = runtime.enable_all();
Self::create(builder)
let tracker = RuntimeTracker::create();
let mut runtime_builder = Self::tracker_builder(tracker.clone());
Self::create(tracker, &mut runtime_builder)
}

pub fn with_worker_threads(workers: usize) -> Result<Self> {
let mut runtime = tokio::runtime::Builder::new_multi_thread();
let builder = runtime.enable_all().worker_threads(workers);
Self::create(builder)
let tracker = RuntimeTracker::create();
let mut runtime_builder = Self::tracker_builder(tracker.clone());
Self::create(tracker, runtime_builder.worker_threads(workers))
}
}

Expand Down
139 changes: 139 additions & 0 deletions common/base/src/runtime_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2020 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::Layout;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;

#[thread_local]
static mut TRACKER: *const ThreadTracker = std::ptr::null();

pub struct ThreadTracker {
rt_tracker: Arc<RuntimeTracker>,
}

impl ThreadTracker {
pub fn create(rt_tracker: Arc<RuntimeTracker>) -> *mut ThreadTracker {
Box::into_raw(Box::new(ThreadTracker { rt_tracker }))
}

#[inline]
pub fn current() -> *const ThreadTracker {
unsafe { TRACKER }
}

#[inline]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn set_current(value: *const ThreadTracker) {
unsafe { TRACKER = value }
}

#[inline]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn get_memory_tracker(pointer: *const Self) -> Arc<MemoryTracker> {
debug_assert!(!pointer.is_null());
unsafe { (*pointer).rt_tracker.get_memory_tracker() }
}
}

pub struct MemoryTracker {
memory_usage: AtomicUsize,
parent_memory_tracker: Option<Arc<MemoryTracker>>,
}

impl MemoryTracker {
pub fn create(parent_memory_tracker: Option<Arc<MemoryTracker>>) -> Arc<MemoryTracker> {
Arc::new(MemoryTracker {
parent_memory_tracker,
memory_usage: AtomicUsize::new(0),
})
}

#[inline]
pub fn alloc_memory(&self, size: usize) {
self.memory_usage.fetch_add(size, Ordering::Relaxed);

if let Some(parent_memory_tracker) = &self.parent_memory_tracker {
parent_memory_tracker.alloc_memory(size);
}
}

#[inline]
pub fn dealloc_memory(&self, size: usize) {
self.memory_usage.fetch_sub(size, Ordering::Relaxed);

if let Some(parent_memory_tracker) = &self.parent_memory_tracker {
parent_memory_tracker.dealloc_memory(size);
}
}

#[inline]
pub fn realloc_memory(&self, old_size: usize, new_size: usize) {
self.memory_usage.fetch_sub(old_size, Ordering::Relaxed);
self.memory_usage.fetch_add(new_size, Ordering::Relaxed);

if let Some(parent_memory_tracker) = &self.parent_memory_tracker {
parent_memory_tracker.realloc_memory(old_size, new_size);
}
}

pub fn current() -> Option<Arc<MemoryTracker>> {
let thread_trckcer = ThreadTracker::current();
match thread_trckcer.is_null() {
true => None,
false => Some(ThreadTracker::get_memory_tracker(thread_trckcer)),
}
}

pub fn get_memory_usage(&self) -> usize {
self.memory_usage.load(Ordering::Relaxed)
}
}

pub struct RuntimeTracker {
memory_tracker: Arc<MemoryTracker>,
}

impl RuntimeTracker {
pub fn create() -> Arc<RuntimeTracker> {
let parent_memory_tracker = MemoryTracker::current();
Arc::new(RuntimeTracker {
memory_tracker: MemoryTracker::create(parent_memory_tracker),
})
}

pub fn get_memory_tracker(&self) -> Arc<MemoryTracker> {
self.memory_tracker.clone()
}

pub fn on_stop_thread(self: &Arc<Self>) -> impl Fn() {
move || unsafe {
let tracker = std::mem::replace(&mut TRACKER, std::ptr::null());

std::ptr::drop_in_place(tracker as usize as *mut ThreadTracker);
std::alloc::dealloc(tracker as *mut u8, Layout::new::<ThreadTracker>());
}
}

pub fn on_start_thread(self: &Arc<Self>) -> impl Fn() {
// TODO: log::info("thread {}-{} started", thread_id, thread_name);
let rt_tracker = self.clone();

move || {
let thread_tracker = ThreadTracker::create(rt_tracker.clone());
ThreadTracker::set_current(thread_tracker);
}
}
}
2 changes: 1 addition & 1 deletion common/base/src/shutdown_signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum SignalType {
Sigterm,
}

pub type SignalStream = Pin<Box<dyn Stream<Item = SignalType>>>;
pub type SignalStream = Pin<Box<dyn Stream<Item = SignalType> + Send>>;

#[cfg(not(target_os = "windows"))]
pub fn signal_stream() -> Result<SignalStream> {
Expand Down
40 changes: 40 additions & 0 deletions common/base/src/thread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2020 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::thread::JoinHandle;

use crate::runtime_tracker::ThreadTracker;

pub struct Thread;

impl Thread {
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
let outer_tracker = ThreadTracker::current() as usize;
std::thread::spawn(move || {
let outer_tracker = outer_tracker as *const ThreadTracker;

if !outer_tracker.is_null() {
// We use the same tracker for std::thread
ThreadTracker::set_current(outer_tracker);
}

f()
})
}
}
1 change: 1 addition & 0 deletions common/mem/mem-allocator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
common-base = {path = "../../base" }
tikv-jemalloc-sys = "0.4.2"
common-infallible = { path = "../../infallible" }
parking_lot = "0.11"
Expand Down
25 changes: 25 additions & 0 deletions common/mem/mem-allocator/src/allocators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod platform {
use std::os::raw::c_int;
use std::os::raw::c_void;

use common_base::ThreadTracker;
use tikv_jemalloc_sys as ffi;

use crate::malloc_size::VoidPtrToSizeFn;
Expand Down Expand Up @@ -94,12 +95,24 @@ mod platform {
unsafe impl GlobalAlloc for Allocator {
#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let thread_tracker = ThreadTracker::current();
if !thread_tracker.is_null() {
let memory_tracker = ThreadTracker::get_memory_tracker(thread_tracker);
memory_tracker.alloc_memory(layout.size());
}

let flags = layout_to_flags(layout.align(), layout.size());
ffi::mallocx(layout.size(), flags) as *mut u8
}

#[inline]
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let thread_tracker = ThreadTracker::current();
if !thread_tracker.is_null() {
let memory_tracker = ThreadTracker::get_memory_tracker(thread_tracker);
memory_tracker.alloc_memory(layout.size());
}

if layout.align() <= MIN_ALIGN && layout.align() <= layout.size() {
ffi::calloc(1, layout.size()) as *mut u8
} else {
Expand All @@ -110,12 +123,24 @@ mod platform {

#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
let thread_tracker = ThreadTracker::current();
if !thread_tracker.is_null() {
let memory_tracker = ThreadTracker::get_memory_tracker(thread_tracker);
memory_tracker.dealloc_memory(layout.size());
}

let flags = layout_to_flags(layout.align(), layout.size());
ffi::sdallocx(ptr as *mut _, layout.size(), flags)
}

#[inline]
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let thread_tracker = ThreadTracker::current();
if !thread_tracker.is_null() {
let memory_tracker = ThreadTracker::get_memory_tracker(thread_tracker);
memory_tracker.realloc_memory(layout.size(), new_size);
}

let flags = layout_to_flags(layout.align(), new_size);
ffi::rallocx(ptr as *mut _, new_size, flags) as *mut u8
}
Expand Down
1 change: 1 addition & 0 deletions common/mem/mem-allocator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod allocators;
mod malloc_size;
// mod sizeof;

pub use allocators::Allocator;
pub use allocators::MallocSizeOfExt;
pub use malloc_size::MallocShallowSizeOf;
pub use malloc_size::MallocSizeOf;
Expand Down
Loading