Skip to content

Make it easy to add more stat types to work packet statistics #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ pub(crate) use scheduler::MMTkScheduler;
pub(self) use scheduler::Scheduler;

mod stat;
mod work_counter;

mod work;
pub use work::CoordinatorWork;
198 changes: 106 additions & 92 deletions src/scheduler/stat.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
//! Statistics for work packets
use super::work_counter::{WorkCounter, WorkCounterBase, WorkDuration};
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};

/// Merge and print the work-packet level statistics from all worker threads
#[derive(Default)]
pub struct SchedulerStat {
/// Map work packet type IDs to work packet names
work_id_name_map: HashMap<TypeId, &'static str>,
/// Count the number of work packets executed for different types
work_counts: HashMap<TypeId, usize>,
work_durations: HashMap<TypeId, Vec<Duration>>,
/// Collect work counters from work threads.
/// Two dimensional vectors are used, e.g.
/// `[[foo_0, ..., foo_n], ..., [bar_0, ..., bar_n]]`.
/// The first dimension is for different types of work counters,
/// (`foo` and `bar` in the above example).
/// The second dimension if for work counters of the same type but from
/// different threads (`foo_0` and `bar_0` are from the same thread).
/// The order of insertion is determined by when [`SchedulerStat::merge`] is
/// called for each [`WorkerLocalStat`].
/// We assume different threads have the same set of work counters
/// (in the same order).
work_counters: HashMap<TypeId, Vec<Vec<Box<dyn WorkCounter>>>>,
}

impl SchedulerStat {
@@ -22,37 +37,7 @@ impl SchedulerStat {
}
}

fn geomean(&self, values: &[f64]) -> f64 {
// Geomean(xs, N=xs.len()) = (PI(xs))^(1/N) = e^{log{PI(xs)^(1/N)}} = e^{ (1/N) * sum_{x \in xs}{ log(x) } }
let logs = values.iter().map(|v| v.ln());
let sum_logs = logs.sum::<f64>();
(sum_logs / values.len() as f64).exp()
}

fn min(&self, values: &[f64]) -> f64 {
let mut min = values[0];
for v in values {
if *v < min {
min = *v
}
}
min
}

fn max(&self, values: &[f64]) -> f64 {
let mut max = values[0];
for v in values {
if *v > max {
max = *v
}
}
max
}

fn sum(&self, values: &[f64]) -> f64 {
values.iter().sum()
}

/// Used during statistics printing at [`crate::memory_manager::harness_end`]
pub fn harness_stat(&self) -> HashMap<String, String> {
let mut stat = HashMap::new();
// Work counts
@@ -67,109 +52,124 @@ impl SchedulerStat {
}
stat.insert("total-work.count".to_owned(), format!("{}", total_count));
// Work execution times
let mut total_durations = vec![];
for (t, durations) in &self.work_durations {
for d in durations {
total_durations.push(*d);
}
let mut duration_overall: WorkCounterBase = Default::default();
for (t, vs) in &self.work_counters {
// Name of the work packet type
let n = self.work_id_name_map[t];
let geomean = self.geomean(
&durations
.iter()
.map(|d| d.as_nanos() as f64)
.collect::<Vec<_>>(),
);
stat.insert(
format!("work.{}.time.geomean", self.work_name(n)),
format!("{:.2}", geomean),
);
let sum = self.sum(
&durations
// Iterate through different types of work counters
for v in vs.iter() {
// Aggregate work counters of the same type but from different
// worker threads
let fold = v
.iter()
.map(|d| d.as_nanos() as f64)
.collect::<Vec<_>>(),
);
stat.insert(
format!("work.{}.time.sum", self.work_name(n)),
format!("{:.2}", sum),
);
}
let durations = total_durations
.iter()
.map(|d| d.as_nanos() as f64)
.collect::<Vec<_>>();
if !durations.is_empty() {
stat.insert(
"total-work.time.geomean".to_owned(),
format!("{:.2}", self.geomean(&durations)),
);
stat.insert(
"total-work.time.min".to_owned(),
format!("{:.2}", self.min(&durations)),
);
stat.insert(
"total-work.time.max".to_owned(),
format!("{:.2}", self.max(&durations)),
);
.fold(Default::default(), |acc: WorkCounterBase, x| {
acc.merge(x.get_base())
});
// Update the overall execution time
duration_overall.merge_inplace(&fold);
let name = v.first().unwrap().name();
stat.insert(
format!("work.{}.{}.total", self.work_name(n), name),
format!("{:.2}", fold.total),
);
stat.insert(
format!("work.{}.{}.min", self.work_name(n), name),
format!("{:.2}", fold.min),
);
stat.insert(
format!("work.{}.{}.max", self.work_name(n), name),
format!("{:.2}", fold.max),
);
}
}
// Print out overall execution time
stat.insert(
"total-work.time.total".to_owned(),
format!("{:.2}", duration_overall.total),
);
stat.insert(
"total-work.time.min".to_owned(),
format!("{:.2}", duration_overall.min),
);
stat.insert(
"total-work.time.max".to_owned(),
format!("{:.2}", duration_overall.max),
);

stat
}

/// Merge work counters from different worker threads
pub fn merge(&mut self, stat: &WorkerLocalStat) {
// Merge work packet type ID to work packet name mapping
for (id, name) in &stat.work_id_name_map {
self.work_id_name_map.insert(*id, *name);
}
// Merge work count for different work packet types
for (id, count) in &stat.work_counts {
if self.work_counts.contains_key(id) {
*self.work_counts.get_mut(id).unwrap() += *count;
} else {
self.work_counts.insert(*id, *count);
}
}
for (id, durations) in &stat.work_durations {
if self.work_durations.contains_key(id) {
let work_durations = self.work_durations.get_mut(id).unwrap();
for d in durations {
work_durations.push(*d);
}
} else {
self.work_durations.insert(*id, durations.clone());
// Merge work counter for different work packet types
for (id, counters) in &stat.work_counters {
// Initialize the two dimensional vector
// [
// [], // foo counter
// [], // bar counter
// ]
let vs = self
.work_counters
.entry(*id)
.or_insert_with(|| vec![vec![]; counters.len()]);
// [
// [counters[0] of type foo],
// [counters[1] of type bar]
// ]
for (v, c) in vs.iter_mut().zip(counters.iter()) {
v.push(c.clone());
}
}
}
}

/// Describing a single work packet
pub struct WorkStat {
type_id: TypeId,
type_name: &'static str,
start_time: SystemTime,
}

impl WorkStat {
/// Stop all work counters for the work packet type of the just executed
/// work packet
#[inline(always)]
pub fn end_of_work(&self, worker_stat: &mut WorkerLocalStat) {
if !worker_stat.is_enabled() {
return;
};
// Insert type ID, name pair
worker_stat
.work_id_name_map
.insert(self.type_id, self.type_name);
// Increment work count
*worker_stat.work_counts.entry(self.type_id).or_insert(0) += 1;
let duration = self.start_time.elapsed().unwrap();
// Stop counters
worker_stat
.work_durations
.work_counters
.entry(self.type_id)
.or_insert_with(Vec::new)
.push(duration);
.and_modify(|v| {
v.iter_mut().for_each(|c| c.stop());
});
}
}

/// Worker thread local counterpart of [`SchedulerStat`]
#[derive(Default)]
pub struct WorkerLocalStat {
work_id_name_map: HashMap<TypeId, &'static str>,
work_counts: HashMap<TypeId, usize>,
work_durations: HashMap<TypeId, Vec<Duration>>,
work_counters: HashMap<TypeId, Vec<Box<dyn WorkCounter>>>,
enabled: AtomicBool,
}

@@ -182,12 +182,26 @@ impl WorkerLocalStat {
pub fn enable(&self) {
self.enabled.store(true, Ordering::SeqCst);
}
/// Measure the execution of a work packet by starting all counters for that
/// type
#[inline]
pub fn measure_work(&mut self, work_id: TypeId, work_name: &'static str) -> WorkStat {
WorkStat {
let stat = WorkStat {
type_id: work_id,
type_name: work_name,
start_time: SystemTime::now(),
};
if self.is_enabled() {
self.work_counters
.entry(work_id)
.or_insert_with(WorkerLocalStat::counter_set)
.iter_mut()
.for_each(|c| c.start());
}
stat
}

// The set of work counters for all work packet types
fn counter_set() -> Vec<Box<dyn WorkCounter>> {
vec![Box::new(WorkDuration::new())]
}
}
135 changes: 135 additions & 0 deletions src/scheduler/work_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//! Counter for work packets
//!
//! Provides an abstraction and implementations of counters for collecting
//! work-packet level statistics
//!
//! See [`crate::util::statistics`] for collecting statistics over a GC cycle
use std::time::SystemTime;

/// Common struct for different work counters
///
/// Stores the total, min and max of counter readings
#[derive(Copy, Clone, Debug)]
pub(super) struct WorkCounterBase {
pub(super) total: f64,
pub(super) min: f64,
pub(super) max: f64,
}

/// Make [`WorkCounter`] trait objects cloneable
pub(super) trait WorkCounterClone {
/// Clone the object
fn clone_box(&self) -> Box<dyn WorkCounter>;
}

impl<T: 'static + WorkCounter + Clone> WorkCounterClone for T {
fn clone_box(&self) -> Box<dyn WorkCounter> {
Box::new(self.clone())
}
}

/// An abstraction of work counters
///
/// Use for trait objects, as we have might have types of work counters for
/// the same work packet and the types are not statically known.
/// The overhead should be negligible compared with the cost of executing
/// a work packet.
pub(super) trait WorkCounter: WorkCounterClone + std::fmt::Debug {
// TODO: consolidate with crate::util::statistics::counter::Counter;
/// Start the counter
fn start(&mut self);
/// Stop the counter
fn stop(&mut self);
/// Name of counter
fn name(&self) -> String;
/// Return a reference to [`WorkCounterBase`]
fn get_base(&self) -> &WorkCounterBase;
/// Return a mutatable reference to [`WorkCounterBase`]
fn get_base_mut(&mut self) -> &mut WorkCounterBase;
}

impl Clone for Box<dyn WorkCounter> {
fn clone(&self) -> Box<dyn WorkCounter> {
self.clone_box()
}
}

impl Default for WorkCounterBase {
fn default() -> Self {
WorkCounterBase {
total: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
}
}
}

impl WorkCounterBase {
/// Merge two [`WorkCounterBase`], keep the semantics of the fields,
/// and return a new object
pub(super) fn merge(&self, other: &Self) -> Self {
let min = self.min.min(other.min);
let max = self.max.max(other.max);
let total = self.total + other.total;
WorkCounterBase { total, min, max }
}

/// Merge two [`WorkCounterBase`], modify the current object in place,
/// and keep the semantics of the fields
pub(super) fn merge_inplace(&mut self, other: &Self) {
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
self.total += other.total;
}

/// Update the object based on a single value
pub(super) fn merge_val(&mut self, val: f64) {
self.min = self.min.min(val);
self.max = self.max.max(val);
self.total += val;
}
}

/// Measure the durations of work packets
///
/// Timing is based on [`SystemTime`]
#[derive(Copy, Clone, Debug)]
pub(super) struct WorkDuration {
base: WorkCounterBase,
start_value: Option<SystemTime>,
running: bool,
}

impl WorkDuration {
pub(super) fn new() -> Self {
WorkDuration {
base: Default::default(),
start_value: None,
running: false,
}
}
}

impl WorkCounter for WorkDuration {
fn start(&mut self) {
self.start_value = Some(SystemTime::now());
self.running = true;
}

fn stop(&mut self) {
let duration = self.start_value.unwrap().elapsed().unwrap().as_nanos() as f64;
self.base.merge_val(duration);
}

fn name(&self) -> String {
"time".to_owned()
}

fn get_base(&self) -> &WorkCounterBase {
&self.base
}

fn get_base_mut(&mut self) -> &mut WorkCounterBase {
&mut self.base
}
}