Skip to content

Commit 5a60eb3

Browse files
committed
Initial commit
0 parents  commit 5a60eb3

File tree

5 files changed

+157
-0
lines changed

5 files changed

+157
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target
2+
Cargo.lock

Cargo.toml

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[package]
2+
name = "tokio-metrics"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
futures-util = "0.3.19"
10+
pin-project-lite = "0.2.7"

src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod runtime;
2+
pub use runtime::Runtime;
3+
4+
mod task;
5+
pub use task::InstrumentedTask;

src/runtime.rs

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub struct Runtime {
2+
3+
}

src/task.rs

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
use futures_util::task::{AtomicWaker, ArcWake};
2+
use pin_project_lite::pin_project;
3+
use std::future::Future;
4+
use std::pin::Pin;
5+
use std::sync::Arc;
6+
use std::sync::atomic::{AtomicU64, Ordering::{Relaxed, SeqCst}};
7+
use std::task::{Context, Poll};
8+
use std::time::{Instant, Duration};
9+
10+
pin_project! {
11+
pub struct InstrumentedTask<T> {
12+
#[pin]
13+
future: T,
14+
state: Arc<State>,
15+
}
16+
}
17+
18+
struct State {
19+
/// Instant at which the `InstrumentedTask` is created. This instant is used
20+
/// as the reference point for duration measurements.
21+
created_at: Instant,
22+
23+
/// The instant, tracked as duration since `created_at`, at which the future
24+
/// was last woken. Tracked as microseconds.
25+
woke_at: AtomicU64,
26+
27+
/// Total number of times the task was scheduled.
28+
num_scheduled: AtomicU64,
29+
30+
/// Total amount of time the task has spent in the waking state.
31+
total_scheduled: AtomicU64,
32+
33+
waker: AtomicWaker,
34+
}
35+
36+
impl<T: Future> InstrumentedTask<T> {
37+
pub fn new(future: T) -> InstrumentedTask<T> {
38+
let state = Arc::new(State {
39+
created_at: Instant::now(),
40+
woke_at: AtomicU64::new(0),
41+
num_scheduled: AtomicU64::new(0),
42+
total_scheduled: AtomicU64::new(0),
43+
waker: AtomicWaker::new(),
44+
});
45+
46+
// HAX
47+
let s = state.clone();
48+
std::thread::spawn(move || {
49+
let mut last_num_scheduled = 0;
50+
let mut last_total_scheduled = 0;
51+
loop {
52+
std::thread::sleep(Duration::from_secs(1));
53+
54+
let num_scheduled = s.num_scheduled.load(Relaxed);
55+
let total_scheduled = s.total_scheduled.load(Relaxed);
56+
57+
println!("num_scheduled = {}; total_scheduled = {:?}", num_scheduled - last_num_scheduled, Duration::from_micros(total_scheduled - last_total_scheduled));
58+
59+
last_num_scheduled = num_scheduled;
60+
last_total_scheduled = total_scheduled;
61+
}
62+
});
63+
64+
InstrumentedTask {
65+
future,
66+
state,
67+
}
68+
}
69+
}
70+
71+
impl<T: Future> Future for InstrumentedTask<T> {
72+
type Output = T::Output;
73+
74+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75+
let this = self.project();
76+
77+
this.state.measure_poll();
78+
79+
// Register the waker
80+
this.state.waker.register(cx.waker());
81+
82+
// Get the instrumented waker
83+
let waker_ref = futures_util::task::waker_ref(&this.state);
84+
let mut cx = Context::from_waker(&*waker_ref);
85+
86+
// Store the waker
87+
Future::poll(this.future, &mut cx)
88+
}
89+
}
90+
91+
impl State {
92+
fn measure_wake(&self) {
93+
let woke_at: u64 = match self.created_at.elapsed().as_micros().try_into() {
94+
Ok(woke_at) => woke_at,
95+
// This is highly unlikely as it would mean the task ran for over
96+
// 500,000 years. If you ran your service for 500,000 years. If you
97+
// are reading this 500,000 years in the future, I'm sorry.
98+
Err(_) => return,
99+
};
100+
101+
// We don't actually care about the result
102+
let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
103+
}
104+
105+
fn measure_poll(&self) {
106+
let woke_at = self.woke_at.swap(0, SeqCst);
107+
108+
if woke_at == 0 {
109+
// Either this is the first poll or it is a false-positive (polled
110+
// without scheduled).
111+
return;
112+
}
113+
114+
let scheduled_dur = (self.created_at + Duration::from_micros(woke_at)).elapsed();
115+
let scheduled_dur: u64 = match scheduled_dur.as_micros().try_into() {
116+
Ok(scheduled_dur) => scheduled_dur,
117+
Err(_) => return,
118+
};
119+
let total_scheduled = self.total_scheduled.load(Relaxed) + scheduled_dur;
120+
self.total_scheduled.store(total_scheduled, Relaxed);
121+
122+
let num_scheduled = self.num_scheduled.load(Relaxed) + 1;
123+
self.num_scheduled.store(num_scheduled, Relaxed);
124+
}
125+
}
126+
127+
impl ArcWake for State {
128+
fn wake_by_ref(arc_self: &Arc<State>) {
129+
arc_self.measure_wake();
130+
arc_self.waker.wake();
131+
}
132+
133+
fn wake(self: Arc<State>) {
134+
self.measure_wake();
135+
self.waker.wake();
136+
}
137+
}

0 commit comments

Comments
 (0)