Skip to content

Commit 3054ad9

Browse files
committed
updated tests to test about shutdown behavior, fixed formatting issues.
1 parent bfca546 commit 3054ad9

File tree

2 files changed

+236
-7
lines changed

2 files changed

+236
-7
lines changed

tracing-appender/src/non_blocking.rs

Lines changed: 174 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ pub struct WorkerGuard {
106106
_guard: Option<JoinHandle<()>>,
107107
sender: Sender<Msg>,
108108
shutdown: Sender<()>,
109+
shutdown_timeout: Duration,
109110
}
110111

111112
/// A non-blocking writer.
@@ -144,7 +145,7 @@ impl NonBlocking {
144145
/// The returned `NonBlocking` writer will have the [default configuration][default] values.
145146
/// Other configurations can be specified using the [builder] interface.
146147
///
147-
/// [default]: NonBlockingBuilder::default
148+
/// [default]: NonBlockingBuilder::default()
148149
/// [builder]: NonBlockingBuilder
149150
pub fn new<T: Write + Send + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
150151
NonBlockingBuilder::default().finish(writer)
@@ -155,6 +156,7 @@ impl NonBlocking {
155156
buffered_lines_limit: usize,
156157
is_lossy: bool,
157158
thread_name: String,
159+
shutdown_timeout: Duration,
158160
) -> (NonBlocking, WorkerGuard) {
159161
let (sender, receiver) = bounded(buffered_lines_limit);
160162

@@ -165,6 +167,7 @@ impl NonBlocking {
165167
worker.worker_thread(thread_name),
166168
sender.clone(),
167169
shutdown_sender,
170+
shutdown_timeout,
168171
);
169172

170173
(
@@ -192,6 +195,7 @@ pub struct NonBlockingBuilder {
192195
buffered_lines_limit: usize,
193196
is_lossy: bool,
194197
thread_name: String,
198+
shutdown_timeout: Duration,
195199
}
196200

197201
impl NonBlockingBuilder {
@@ -227,8 +231,20 @@ impl NonBlockingBuilder {
227231
self.buffered_lines_limit,
228232
self.is_lossy,
229233
self.thread_name,
234+
self.shutdown_timeout,
230235
)
231236
}
237+
238+
/// Sets the timeout for shutdown of the worker thread.
239+
///
240+
/// This is the maximum amount of time the main thread will wait
241+
/// for the worker thread to finish proccessing pending logs during shutdown
242+
///
243+
/// The default timeout is 1 second.
244+
pub fn shutdown_timeout(mut self, timeout: Duration) -> NonBlockingBuilder {
245+
self.shutdown_timeout = timeout;
246+
self
247+
}
232248
}
233249

234250
impl Default for NonBlockingBuilder {
@@ -237,6 +253,7 @@ impl Default for NonBlockingBuilder {
237253
buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT,
238254
is_lossy: true,
239255
thread_name: "tracing-appender".to_string(),
256+
shutdown_timeout: Duration::from_secs(1),
240257
}
241258
}
242259
}
@@ -276,11 +293,17 @@ impl<'a> MakeWriter<'a> for NonBlocking {
276293
}
277294

278295
impl WorkerGuard {
279-
fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
296+
fn new(
297+
handle: JoinHandle<()>,
298+
sender: Sender<Msg>,
299+
shutdown: Sender<()>,
300+
shutdown_timeout: Duration,
301+
) -> Self {
280302
WorkerGuard {
281303
_guard: Some(handle),
282304
sender,
283305
shutdown,
306+
shutdown_timeout,
284307
}
285308
}
286309
}
@@ -295,14 +318,27 @@ impl Drop for WorkerGuard {
295318
// Attempt to wait for `Worker` to flush all messages before dropping. This happens
296319
// when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
297320
// so that drop is not blocked indefinitely.
298-
// TODO: Make timeout configurable.
299-
let _ = self.shutdown.send_timeout((), Duration::from_millis(1000));
321+
// The shutdown timeout now is configurable
322+
match self.shutdown.send_timeout((), self.shutdown_timeout) {
323+
Ok(_) => (),
324+
Err(SendTimeoutError::Timeout(_)) => {
325+
eprintln!(
326+
"Shutting down logging worker timed out after {:?}.",
327+
self.shutdown_timeout
328+
);
329+
}
330+
Err(SendTimeoutError::Disconnected(_)) => {
331+
eprintln!("Shutdown failed because logging worker was disconnected");
332+
}
333+
}
300334
}
301-
Err(SendTimeoutError::Disconnected(_)) => (),
302-
Err(SendTimeoutError::Timeout(e)) => println!(
335+
Err(SendTimeoutError::Timeout(e)) => eprintln!(
303336
"Failed to send shutdown signal to logging worker. Error: {:?}",
304337
e
305338
),
339+
Err(SendTimeoutError::Disconnected(_)) => {
340+
eprintln!("Logging worker disconnected before shutdown signal");
341+
}
306342
}
307343
}
308344
}
@@ -347,7 +383,7 @@ impl ErrorCounter {
347383
#[cfg(test)]
348384
mod test {
349385
use super::*;
350-
use std::sync::mpsc;
386+
use std::sync::{mpsc, Mutex};
351387
use std::thread;
352388
use std::time::Duration;
353389

@@ -493,4 +529,135 @@ mod test {
493529
assert_eq!(10, hello_count);
494530
assert_eq!(0, error_count.dropped_lines());
495531
}
532+
533+
use std::{
534+
io::{self, Write},
535+
sync::atomic::{AtomicUsize, Ordering},
536+
sync::Arc,
537+
};
538+
539+
struct ControlledWriter {
540+
counter: Arc<AtomicUsize>,
541+
ready_tx: mpsc::Sender<()>,
542+
proceed_rx: mpsc::Receiver<()>,
543+
}
544+
545+
impl ControlledWriter {
546+
fn new() -> (Self, mpsc::Sender<()>, mpsc::Receiver<()>) {
547+
let (ready_tx, ready_rx) = mpsc::channel();
548+
let (proceed_tx, proceed_rx) = mpsc::channel();
549+
let writer = ControlledWriter {
550+
counter: Arc::new(AtomicUsize::new(0)),
551+
ready_tx,
552+
proceed_rx,
553+
};
554+
(writer, proceed_tx, ready_rx)
555+
}
556+
}
557+
558+
impl Write for ControlledWriter {
559+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
560+
self.ready_tx.send(()).unwrap();
561+
self.proceed_rx.recv().unwrap();
562+
self.counter.fetch_add(1, Ordering::SeqCst);
563+
Ok(buf.len())
564+
}
565+
566+
fn flush(&mut self) -> io::Result<()> {
567+
Ok(())
568+
}
569+
}
570+
571+
#[test]
572+
fn test_complete_message_processing() {
573+
let (writer, proceed_tx, ready_rx) = ControlledWriter::new();
574+
let counter = writer.counter.clone();
575+
576+
let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer);
577+
578+
for i in 0..3 {
579+
non_blocking
580+
.write_all(format!("msg{}\n", i).as_bytes())
581+
.unwrap();
582+
}
583+
584+
for _ in 0..3 {
585+
ready_rx.recv().unwrap();
586+
proceed_tx.send(()).unwrap();
587+
}
588+
589+
drop(guard);
590+
591+
assert_eq!(
592+
counter.load(Ordering::SeqCst),
593+
3,
594+
"All messages should be processed"
595+
);
596+
}
597+
598+
#[test]
599+
fn test_partial_message_processing() {
600+
let (writer, proceed_tx, ready_rx) = ControlledWriter::new();
601+
let counter = writer.counter.clone();
602+
603+
let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer);
604+
605+
for i in 0..3 {
606+
non_blocking
607+
.write_all(format!("msg{}\n", i).as_bytes())
608+
.unwrap();
609+
}
610+
611+
ready_rx.recv().unwrap();
612+
proceed_tx.send(()).unwrap();
613+
614+
drop(guard);
615+
616+
let processed = counter.load(Ordering::SeqCst);
617+
assert!(processed >= 1, "At least one message should be processed");
618+
assert!(processed < 3, "Not all messages should be processed");
619+
}
620+
621+
#[test]
622+
fn test_no_message_processing() {
623+
let (writer, _proceed_tx, _ready_rx) = ControlledWriter::new();
624+
let counter = writer.counter.clone();
625+
626+
let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer);
627+
628+
for i in 0..3 {
629+
non_blocking
630+
.write_all(format!("msg{}\n", i).as_bytes())
631+
.unwrap();
632+
}
633+
634+
drop(guard);
635+
636+
assert_eq!(
637+
counter.load(Ordering::SeqCst),
638+
0,
639+
"No messages should be processed"
640+
);
641+
}
642+
643+
#[test]
644+
fn test_single_message_processing() {
645+
let (writer, proceed_tx, ready_rx) = ControlledWriter::new();
646+
let counter = writer.counter.clone();
647+
648+
let (mut non_blocking, guard) = NonBlockingBuilder::default().finish(writer);
649+
650+
non_blocking.write_all(b"single message\n").unwrap();
651+
652+
ready_rx.recv().unwrap();
653+
proceed_tx.send(()).unwrap();
654+
655+
drop(guard);
656+
657+
assert_eq!(
658+
counter.load(Ordering::SeqCst),
659+
1,
660+
"Single message should be processed"
661+
);
662+
}
496663
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::{
2+
io::{self, Write},
3+
sync::atomic::{AtomicBool, AtomicU64, Ordering},
4+
thread,
5+
time::{Duration, Instant},
6+
};
7+
use tracing_appender::non_blocking::NonBlockingBuilder;
8+
9+
static BLOCK_IN_WORKER: AtomicBool = AtomicBool::new(false);
10+
static BLOCK_DURATION_SECS: AtomicU64 = AtomicU64::new(3);
11+
12+
struct BlockingMemoryWriter {
13+
buffer: Vec<u8>,
14+
}
15+
16+
impl BlockingMemoryWriter {
17+
fn new() -> Self {
18+
Self { buffer: Vec::new() }
19+
}
20+
}
21+
22+
impl Write for BlockingMemoryWriter {
23+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
24+
if BLOCK_IN_WORKER.load(Ordering::Relaxed) {
25+
let block_secs = BLOCK_DURATION_SECS.load(Ordering::Relaxed);
26+
thread::sleep(Duration::from_secs(block_secs));
27+
}
28+
self.buffer.extend_from_slice(buf);
29+
Ok(buf.len())
30+
}
31+
32+
fn flush(&mut self) -> io::Result<()> {
33+
Ok(())
34+
}
35+
}
36+
37+
#[test]
38+
fn test_shutdown_timeout_behavior() {
39+
let timeout = Duration::from_millis(300);
40+
let blocking_writer = BlockingMemoryWriter::new();
41+
42+
let (mut non_blocking, guard) = NonBlockingBuilder::default()
43+
.shutdown_timeout(timeout)
44+
.finish(blocking_writer);
45+
46+
non_blocking.write_all(b"test data\n").unwrap();
47+
48+
thread::sleep(Duration::from_millis(50));
49+
BLOCK_IN_WORKER.store(true, Ordering::Relaxed);
50+
non_blocking.write_all(b"blocking data\n").unwrap();
51+
52+
let start = Instant::now();
53+
drop(guard);
54+
let elapsed = start.elapsed();
55+
56+
assert!(
57+
elapsed >= timeout,
58+
"Shutdown completed before timeout: {:?}, expected at least {:?}",
59+
elapsed,
60+
timeout
61+
);
62+
}

0 commit comments

Comments
 (0)