Skip to content

Commit f135737

Browse files
committed
fmt: Add support for runtime filter reloading (#45)
This branch adds support for runtime filter reloading to `tokio-trace-fmt`. Filters may be reloaded through a `Handle` type that can be constructed when the subscriber or filter are constructed. In the future, when PR #44 merges, it will also be possible to add a free function `record_current` that downcasts and reloads the current subscriber. Currently, when using a reloadable filter, `Interest` is never cached. When tokio-rs/tokio#1039 is merged, we can invalidate the callsite cache when reloading, allowing us to cache interests. Closes #42 Signed-off-by: Eliza Weisman <[email protected]>
1 parent c993cca commit f135737

File tree

4 files changed

+244
-26
lines changed

4 files changed

+244
-26
lines changed

tokio-trace-fmt/src/filter.rs renamed to tokio-trace-fmt/src/filter/env.rs

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,11 @@
1-
use span;
2-
31
use regex::Regex;
42
use tokio_trace_core::{subscriber::Interest, Level, Metadata};
3+
use {filter::Filter, span::Context};
54

65
use std::env;
76

87
pub const DEFAULT_FILTER_ENV: &'static str = "RUST_LOG";
98

10-
pub trait Filter<N> {
11-
fn callsite_enabled(&self, metadata: &Metadata, ctx: &span::Context<N>) -> Interest {
12-
if self.enabled(metadata, ctx) {
13-
Interest::always()
14-
} else {
15-
Interest::never()
16-
}
17-
}
18-
19-
fn enabled(&self, metadata: &Metadata, ctx: &span::Context<N>) -> bool;
20-
}
21-
229
#[derive(Debug)]
2310
pub struct EnvFilter {
2411
directives: Vec<Directive>,
@@ -105,7 +92,7 @@ where
10592
}
10693

10794
impl<N> Filter<N> for EnvFilter {
108-
fn callsite_enabled(&self, metadata: &Metadata, _: &span::Context<N>) -> Interest {
95+
fn callsite_enabled(&self, metadata: &Metadata, _: &Context<N>) -> Interest {
10996
if !self.includes_span_directive && metadata.level() > &self.max_level {
11097
return Interest::never();
11198
}
@@ -134,7 +121,7 @@ impl<N> Filter<N> for EnvFilter {
134121
interest
135122
}
136123

137-
fn enabled<'a>(&self, metadata: &Metadata, ctx: &span::Context<'a, N>) -> bool {
124+
fn enabled<'a>(&self, metadata: &Metadata, ctx: &Context<'a, N>) -> bool {
138125
for directive in self.directives_for(metadata) {
139126
let accepts_level = metadata.level() <= &directive.level;
140127
match directive.in_span.as_ref() {
@@ -319,16 +306,6 @@ impl Default for Directive {
319306
}
320307
}
321308

322-
impl<'a, F, N> Filter<N> for F
323-
where
324-
F: Fn(&Metadata, &span::Context<N>) -> bool,
325-
N: ::NewVisitor<'a>,
326-
{
327-
fn enabled(&self, metadata: &Metadata, ctx: &span::Context<N>) -> bool {
328-
(self)(metadata, ctx)
329-
}
330-
}
331-
332309
#[cfg(test)]
333310
mod tests {
334311
use super::*;

tokio-trace-fmt/src/filter/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use span;
2+
3+
use tokio_trace_core::{subscriber::Interest, Metadata};
4+
5+
pub trait Filter<N> {
6+
fn callsite_enabled(&self, metadata: &Metadata, ctx: &span::Context<N>) -> Interest {
7+
if self.enabled(metadata, ctx) {
8+
Interest::always()
9+
} else {
10+
Interest::never()
11+
}
12+
}
13+
14+
fn enabled(&self, metadata: &Metadata, ctx: &span::Context<N>) -> bool;
15+
}
16+
17+
pub mod env;
18+
pub mod reload;
19+
20+
pub use self::{env::EnvFilter, reload::ReloadFilter};
21+
22+
impl<'a, F, N> Filter<N> for F
23+
where
24+
F: Fn(&Metadata, &span::Context<N>) -> bool,
25+
N: ::NewVisitor<'a>,
26+
{
27+
fn enabled(&self, metadata: &Metadata, ctx: &span::Context<N>) -> bool {
28+
(self)(metadata, ctx)
29+
}
30+
}

tokio-trace-fmt/src/filter/reload.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use parking_lot::RwLock;
2+
use std::{
3+
error, fmt,
4+
sync::{Arc, Weak},
5+
};
6+
use tokio_trace_core::{subscriber::Interest, Metadata};
7+
use {filter::Filter, span::Context};
8+
9+
#[derive(Debug)]
10+
pub struct ReloadFilter<F> {
11+
inner: Arc<RwLock<F>>,
12+
}
13+
14+
#[derive(Debug, Clone)]
15+
pub struct Handle<F> {
16+
inner: Weak<RwLock<F>>,
17+
}
18+
19+
#[derive(Debug)]
20+
pub struct Error {
21+
kind: ErrorKind,
22+
}
23+
24+
#[derive(Debug)]
25+
enum ErrorKind {
26+
SubscriberGone,
27+
}
28+
29+
// ===== impl ReloadFilter =====
30+
31+
impl<F, N> Filter<N> for ReloadFilter<F>
32+
where
33+
F: Filter<N>,
34+
{
35+
fn callsite_enabled(&self, _: &Metadata, _: &Context<N>) -> Interest {
36+
// TODO(eliza): When tokio-rs/tokio#1039 lands, we can allow our
37+
// interest to be cached. For now, we must always return `sometimes`.
38+
Interest::sometimes()
39+
}
40+
41+
fn enabled(&self, metadata: &Metadata, ctx: &Context<N>) -> bool {
42+
self.inner.read().enabled(metadata, ctx)
43+
}
44+
}
45+
46+
impl<F: 'static> ReloadFilter<F> {
47+
pub fn new<N>(f: F) -> Self
48+
where
49+
F: Filter<N>,
50+
{
51+
Self {
52+
inner: Arc::new(RwLock::new(f)),
53+
}
54+
}
55+
56+
pub fn handle(&self) -> Handle<F> {
57+
Handle {
58+
inner: Arc::downgrade(&self.inner),
59+
}
60+
}
61+
}
62+
63+
// ===== impl Handle =====
64+
65+
impl<F: 'static> Handle<F> {
66+
pub fn reload<N>(&self, new_filter: impl Into<F>) -> Result<(), Error>
67+
where
68+
F: Filter<N>,
69+
{
70+
self.modify(|filter| {
71+
*filter = new_filter.into();
72+
})
73+
}
74+
75+
/// Invokes a closure with a mutable reference to the current filter,
76+
/// allowing it to be modified in place.
77+
pub fn modify<N>(&self, f: impl FnOnce(&mut F)) -> Result<(), Error>
78+
where
79+
F: Filter<N>,
80+
{
81+
let inner = self.inner.upgrade().ok_or(Error {
82+
kind: ErrorKind::SubscriberGone,
83+
})?;
84+
let mut inner = inner.write();
85+
f(&mut *inner);
86+
// TODO(eliza): When tokio-rs/tokio#1039 lands, this is where we would
87+
// invalidate the callsite cache.
88+
Ok(())
89+
}
90+
91+
/// Returns a clone of the filter's current value if it still exists.
92+
/// Otherwise, if the filter has been dropped, returns `None`.
93+
pub fn clone_current(&self) -> Option<F>
94+
where
95+
F: Clone,
96+
{
97+
self.inner.upgrade().map(|inner| inner.read().clone())
98+
}
99+
}
100+
101+
// ===== impl Error =====
102+
103+
impl fmt::Display for Error {
104+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105+
error::Error::description(self).fmt(f)
106+
}
107+
}
108+
109+
impl error::Error for Error {
110+
fn description(&self) -> &str {
111+
match self.kind {
112+
ErrorKind::SubscriberGone => "subscriber no longer exists",
113+
}
114+
}
115+
}
116+
117+
#[cfg(test)]
118+
mod test {
119+
use std::sync::atomic::{AtomicUsize, Ordering};
120+
use tokio_trace_core::{
121+
dispatcher::{self, Dispatch},
122+
Metadata,
123+
};
124+
use *;
125+
126+
#[test]
127+
fn reload_handle() {
128+
static FILTER1_CALLS: AtomicUsize = AtomicUsize::new(0);
129+
static FILTER2_CALLS: AtomicUsize = AtomicUsize::new(0);
130+
131+
fn filter1<N>(_: &Metadata, _: &span::Context<N>) -> bool {
132+
FILTER1_CALLS.fetch_add(1, Ordering::Relaxed);
133+
true
134+
}
135+
136+
fn filter2<N>(_: &Metadata, _: &span::Context<N>) -> bool {
137+
FILTER2_CALLS.fetch_add(1, Ordering::Relaxed);
138+
true
139+
}
140+
141+
fn event() {
142+
trace!("my event");
143+
}
144+
145+
let subscriber = FmtSubscriber::builder()
146+
.with_filter(filter1 as fn(&Metadata, &span::Context<_>) -> bool)
147+
.with_filter_reloading();
148+
let handle = subscriber.reload_handle();
149+
let subscriber = Dispatch::new(subscriber.finish());
150+
151+
dispatcher::with_default(&subscriber, || {
152+
assert_eq!(FILTER1_CALLS.load(Ordering::Relaxed), 0);
153+
assert_eq!(FILTER2_CALLS.load(Ordering::Relaxed), 0);
154+
155+
event();
156+
157+
assert_eq!(FILTER1_CALLS.load(Ordering::Relaxed), 1);
158+
assert_eq!(FILTER2_CALLS.load(Ordering::Relaxed), 0);
159+
160+
handle
161+
.reload(filter2 as fn(&Metadata, &span::Context<_>) -> bool)
162+
.expect("should reload");
163+
164+
event();
165+
166+
assert_eq!(FILTER1_CALLS.load(Ordering::Relaxed), 1);
167+
assert_eq!(FILTER2_CALLS.load(Ordering::Relaxed), 1);
168+
})
169+
}
170+
}

tokio-trace-fmt/src/lib.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
extern crate tokio_trace_core;
2+
#[cfg(test)]
3+
#[macro_use]
4+
extern crate tokio_trace;
25

36
#[cfg(feature = "ansi")]
47
extern crate ansi_term;
@@ -76,6 +79,17 @@ where
7679
}
7780
}
7881

82+
impl<N, E, F> FmtSubscriber<N, E, filter::ReloadFilter<F>>
83+
where
84+
F: Filter<N> + 'static,
85+
{
86+
/// Returns a `Handle` that may be used to reload this subscriber's
87+
/// filter.
88+
pub fn reload_handle(&self) -> filter::reload::Handle<F> {
89+
self.filter.handle()
90+
}
91+
}
92+
7993
impl<N, E, F> tokio_trace_core::Subscriber for FmtSubscriber<N, E, F>
8094
where
8195
N: for<'a> NewVisitor<'a> + 'static,
@@ -205,6 +219,33 @@ where
205219
}
206220
}
207221

222+
impl<N, E, F> Builder<N, E, F>
223+
where
224+
F: Filter<N> + 'static,
225+
{
226+
/// Configures the subscriber being built to allow filter reloading at
227+
/// runtime.
228+
pub fn with_filter_reloading(self) -> Builder<N, E, filter::ReloadFilter<F>> {
229+
Builder {
230+
new_visitor: self.new_visitor,
231+
fmt_event: self.fmt_event,
232+
filter: filter::ReloadFilter::new(self.filter),
233+
settings: self.settings,
234+
}
235+
}
236+
}
237+
238+
impl<N, E, F> Builder<N, E, filter::ReloadFilter<F>>
239+
where
240+
F: Filter<N> + 'static,
241+
{
242+
/// Returns a `Handle` that may be used to reload the constructed subscriber's
243+
/// filter.
244+
pub fn reload_handle(&self) -> filter::reload::Handle<F> {
245+
self.filter.handle()
246+
}
247+
}
248+
208249
impl<N, E, F> Builder<N, E, F> {
209250
/// Sets the Visitor that the subscriber being built will use to record
210251
/// fields.

0 commit comments

Comments
 (0)