Skip to content

Commit a1860eb

Browse files
authored
Better handling of shutdown in BatchLogProcessor (#2581)
1 parent 9dfcff1 commit a1860eb

File tree

1 file changed

+139
-95
lines changed

1 file changed

+139
-95
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 139 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ pub struct BatchLogProcessor {
270270
handle: Mutex<Option<thread::JoinHandle<()>>>,
271271
forceflush_timeout: Duration,
272272
shutdown_timeout: Duration,
273-
is_shutdown: AtomicBool,
274273
export_log_message_sent: Arc<AtomicBool>,
275274
current_batch_size: Arc<AtomicUsize>,
276275
max_export_batch_size: usize,
@@ -292,87 +291,112 @@ impl Debug for BatchLogProcessor {
292291

293292
impl LogProcessor for BatchLogProcessor {
294293
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
295-
// noop after shutdown
296-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
297-
otel_warn!(
298-
name: "BatchLogProcessor.Emit.ProcessorShutdown",
299-
message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
300-
);
301-
return;
302-
}
303-
304294
let result = self
305295
.logs_sender
306296
.try_send(Box::new((record.clone(), instrumentation.clone())));
307297

308-
if result.is_err() {
309-
// Increment dropped logs count. The first time we have to drop a log,
310-
// emit a warning.
311-
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
312-
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
313-
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
314-
}
315-
return;
316-
}
317-
318-
// At this point, sending the log record to the data channel was successful.
319-
// Increment the current batch size and check if it has reached the max export batch size.
320-
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
321-
{
322-
// Check if the a control message for exporting logs is already sent to the worker thread.
323-
// If not, send a control message to export logs.
324-
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.
325-
326-
if !self.export_log_message_sent.load(Ordering::Relaxed) {
327-
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
328-
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
329-
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
330-
// We could have used compare_exchange as well here, but it's more verbose than swap.
331-
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
332-
match self.message_sender.try_send(BatchMessage::ExportLog(
333-
self.export_log_message_sent.clone(),
334-
)) {
335-
Ok(_) => {
336-
// Control message sent successfully.
337-
}
338-
Err(_err) => {
339-
// TODO: Log error
340-
// If the control message could not be sent, reset the `export_log_message_sent` flag.
341-
self.export_log_message_sent.store(false, Ordering::Relaxed);
298+
// match for result and handle each separately
299+
match result {
300+
Ok(_) => {
301+
// Successfully sent the log record to the data channel.
302+
// Increment the current batch size and check if it has reached
303+
// the max export batch size.
304+
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
305+
>= self.max_export_batch_size
306+
{
307+
// Check if the a control message for exporting logs is
308+
// already sent to the worker thread. If not, send a control
309+
// message to export logs. `export_log_message_sent` is set
310+
// to false ONLY when the worker thread has processed the
311+
// control message.
312+
313+
if !self.export_log_message_sent.load(Ordering::Relaxed) {
314+
// This is a cost-efficient check as atomic load
315+
// operations do not require exclusive access to cache
316+
// line. Perform atomic swap to
317+
// `export_log_message_sent` ONLY when the atomic load
318+
// operation above returns false. Atomic
319+
// swap/compare_exchange operations require exclusive
320+
// access to cache line on most processor architectures.
321+
// We could have used compare_exchange as well here, but
322+
// it's more verbose than swap.
323+
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
324+
match self.message_sender.try_send(BatchMessage::ExportLog(
325+
self.export_log_message_sent.clone(),
326+
)) {
327+
Ok(_) => {
328+
// Control message sent successfully.
329+
}
330+
Err(_err) => {
331+
// TODO: Log error If the control message
332+
// could not be sent, reset the
333+
// `export_log_message_sent` flag.
334+
self.export_log_message_sent.store(false, Ordering::Relaxed);
335+
}
336+
}
342337
}
343338
}
344339
}
345340
}
341+
Err(mpsc::TrySendError::Full(_)) => {
342+
// Increment dropped logs count. The first time we have to drop
343+
// a log, emit a warning.
344+
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
345+
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
346+
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
347+
}
348+
}
349+
Err(mpsc::TrySendError::Disconnected(_)) => {
350+
// Given background thread is the only receiver, and it's
351+
// disconnected, it indicates the thread is shutdown
352+
otel_warn!(
353+
name: "BatchLogProcessor.Emit.AfterShutdown",
354+
message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
355+
);
356+
}
346357
}
347358
}
348359

349360
fn force_flush(&self) -> LogResult<()> {
350-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
351-
return LogResult::Err(LogError::Other(
352-
"BatchLogProcessor is already shutdown".into(),
353-
));
354-
}
355361
let (sender, receiver) = mpsc::sync_channel(1);
356-
self.message_sender
362+
match self
363+
.message_sender
357364
.try_send(BatchMessage::ForceFlush(sender))
358-
.map_err(|err| LogError::Other(err.into()))?;
359-
360-
receiver
361-
.recv_timeout(self.forceflush_timeout)
362-
.map_err(|err| {
363-
if err == RecvTimeoutError::Timeout {
364-
LogError::ExportTimedOut(self.forceflush_timeout)
365-
} else {
366-
LogError::Other(err.into())
367-
}
368-
})?
365+
{
366+
Ok(_) => receiver
367+
.recv_timeout(self.forceflush_timeout)
368+
.map_err(|err| {
369+
if err == RecvTimeoutError::Timeout {
370+
LogError::ExportTimedOut(self.forceflush_timeout)
371+
} else {
372+
LogError::Other(err.into())
373+
}
374+
})?,
375+
Err(mpsc::TrySendError::Full(_)) => {
376+
// If the control message could not be sent, emit a warning.
377+
otel_debug!(
378+
name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
379+
message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
380+
);
381+
LogResult::Err(LogError::Other("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))
382+
}
383+
Err(mpsc::TrySendError::Disconnected(_)) => {
384+
// Given background thread is the only receiver, and it's
385+
// disconnected, it indicates the thread is shutdown
386+
otel_debug!(
387+
name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
388+
message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
389+
);
390+
391+
LogResult::Err(LogError::Other(
392+
"ForceFlush cannot be performed as BatchLogProcessor is already shutdown"
393+
.into(),
394+
))
395+
}
396+
}
369397
}
370398

371399
fn shutdown(&self) -> LogResult<()> {
372-
// Set is_shutdown to true
373-
self.is_shutdown
374-
.store(true, std::sync::atomic::Ordering::Relaxed);
375-
376400
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
377401
let max_queue_size = self.max_queue_size;
378402
if dropped_logs > 0 {
@@ -385,35 +409,56 @@ impl LogProcessor for BatchLogProcessor {
385409
}
386410

387411
let (sender, receiver) = mpsc::sync_channel(1);
388-
self.message_sender
389-
.try_send(BatchMessage::Shutdown(sender))
390-
.map_err(|err| LogError::Other(err.into()))?;
391-
392-
receiver
393-
.recv_timeout(self.shutdown_timeout)
394-
.map(|_| {
395-
// join the background thread after receiving back the shutdown signal
396-
if let Some(handle) = self.handle.lock().unwrap().take() {
397-
handle.join().unwrap();
398-
}
399-
LogResult::Ok(())
400-
})
401-
.map_err(|err| match err {
402-
RecvTimeoutError::Timeout => {
403-
otel_error!(
404-
name: "BatchLogProcessor.Shutdown.Timeout",
405-
message = "BatchLogProcessor shutdown timing out."
406-
);
407-
LogError::ExportTimedOut(self.shutdown_timeout)
408-
}
409-
_ => {
410-
otel_error!(
411-
name: "BatchLogProcessor.Shutdown.Error",
412-
error = format!("{}", err)
413-
);
414-
LogError::Other(err.into())
415-
}
416-
})?
412+
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
413+
Ok(_) => {
414+
receiver
415+
.recv_timeout(self.shutdown_timeout)
416+
.map(|_| {
417+
// join the background thread after receiving back the
418+
// shutdown signal
419+
if let Some(handle) = self.handle.lock().unwrap().take() {
420+
handle.join().unwrap();
421+
}
422+
LogResult::Ok(())
423+
})
424+
.map_err(|err| match err {
425+
RecvTimeoutError::Timeout => {
426+
otel_error!(
427+
name: "BatchLogProcessor.Shutdown.Timeout",
428+
message = "BatchLogProcessor shutdown timing out."
429+
);
430+
LogError::ExportTimedOut(self.shutdown_timeout)
431+
}
432+
_ => {
433+
otel_error!(
434+
name: "BatchLogProcessor.Shutdown.Error",
435+
error = format!("{}", err)
436+
);
437+
LogError::Other(err.into())
438+
}
439+
})?
440+
}
441+
Err(mpsc::TrySendError::Full(_)) => {
442+
// If the control message could not be sent, emit a warning.
443+
otel_debug!(
444+
name: "BatchLogProcessor.Shutdown.ControlChannelFull",
445+
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
446+
);
447+
LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))
448+
}
449+
Err(mpsc::TrySendError::Disconnected(_)) => {
450+
// Given background thread is the only receiver, and it's
451+
// disconnected, it indicates the thread is shutdown
452+
otel_debug!(
453+
name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
454+
message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
455+
);
456+
457+
LogResult::Err(LogError::Other(
458+
"BatchLogProcessor is already shutdown".into(),
459+
))
460+
}
461+
}
417462
}
418463

419464
fn set_resource(&self, resource: &Resource) {
@@ -590,7 +635,6 @@ impl BatchLogProcessor {
590635
handle: Mutex::new(Some(handle)),
591636
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
592637
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
593-
is_shutdown: AtomicBool::new(false),
594638
dropped_logs_count: AtomicUsize::new(0),
595639
max_queue_size,
596640
export_log_message_sent: Arc::new(AtomicBool::new(false)),

0 commit comments

Comments
 (0)