-
Notifications
You must be signed in to change notification settings - Fork 554
fix: SimpleProcessor for Logs simplified #2825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,18 +16,17 @@ | |
//! +-----+---------------+ +-----------------------+ +-------------------+ | ||
//! ``` | ||
|
||
use crate::error::{OTelSdkError, OTelSdkResult}; | ||
use crate::error::OTelSdkResult; | ||
use crate::logs::log_processor::LogProcessor; | ||
use crate::{ | ||
logs::{LogBatch, LogExporter, SdkLogRecord}, | ||
Resource, | ||
}; | ||
|
||
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; | ||
use opentelemetry::{otel_warn, InstrumentationScope}; | ||
|
||
use std::fmt::Debug; | ||
use std::sync::atomic::AtomicBool; | ||
use std::sync::Mutex; | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
|
||
/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately | ||
/// exports log records as they are emitted. Log records are exported synchronously | ||
|
@@ -60,54 +59,47 @@ | |
/// | ||
#[derive(Debug)] | ||
pub struct SimpleLogProcessor<T: LogExporter> { | ||
exporter: Mutex<T>, | ||
is_shutdown: AtomicBool, | ||
exporter: T, | ||
is_exporting: AtomicBool, | ||
} | ||
|
||
impl<T: LogExporter> SimpleLogProcessor<T> { | ||
/// Creates a new instance of `SimpleLogProcessor`. | ||
pub fn new(exporter: T) -> Self { | ||
SimpleLogProcessor { | ||
exporter: Mutex::new(exporter), | ||
is_shutdown: AtomicBool::new(false), | ||
exporter, | ||
is_exporting: AtomicBool::new(false), | ||
} | ||
} | ||
} | ||
|
||
impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> { | ||
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { | ||
// noop after shutdown | ||
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { | ||
// this is a warning, as the user is trying to log after the processor has been shutdown | ||
otel_warn!( | ||
name: "SimpleLogProcessor.Emit.ProcessorShutdown", | ||
); | ||
return; | ||
// export() does not require mutable self and can be called in parallel | ||
// with other export() calls. However, OTel Spec requires that | ||
// existing export() must be completed before the next export() call. | ||
while !self | ||
.is_exporting | ||
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) | ||
.is_ok() | ||
{ | ||
// Another thread is currently exporting, yield to let other work proceed | ||
std::thread::yield_now(); | ||
} | ||
|
||
let result = self | ||
.exporter | ||
.lock() | ||
.map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into())) | ||
.and_then(|exporter| { | ||
let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; | ||
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) | ||
}); | ||
// Handle errors with specific static names | ||
match result { | ||
Err(OTelSdkError::InternalFailure(_)) => { | ||
// logging as debug as this is not a user error | ||
otel_debug!( | ||
name: "SimpleLogProcessor.Emit.MutexPoisoning", | ||
); | ||
} | ||
Err(err) => { | ||
otel_error!( | ||
name: "SimpleLogProcessor.Emit.ExportError", | ||
error = format!("{}",err) | ||
); | ||
} | ||
_ => {} | ||
// We now have exclusive access to export | ||
let result = { | ||
let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; | ||
futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))) | ||
}; | ||
|
||
// Release the lock | ||
self.is_exporting.store(false, Ordering::Release); | ||
if let Err(err) = result { | ||
otel_warn!( | ||
name: "SimpleLogProcessor.Emit.ExportError", | ||
error = format!("{}",err) | ||
); | ||
} | ||
} | ||
|
||
|
@@ -116,21 +108,11 @@ | |
} | ||
|
||
fn shutdown(&self) -> OTelSdkResult { | ||
self.is_shutdown | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is not complicated. It's simple enough. I don't find the I don't see the need to go against the Rust way of doing things in this case. The current There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't disagree at all! Regd. Rust idiomatic way to prevent multiple export at the same time - I think it'd be better to model export() as requiring mutable self, but we know that will limit us from achieving higher perf when we need it! If all agree this PR is wrong direction, I can abandon it. (Its harmless with/without this PR, as simple processor is just a learning/test purpose component only) |
||
.store(true, std::sync::atomic::Ordering::Relaxed); | ||
if let Ok(exporter) = self.exporter.lock() { | ||
exporter.shutdown() | ||
} else { | ||
Err(OTelSdkError::InternalFailure( | ||
"SimpleLogProcessor mutex poison at shutdown".into(), | ||
)) | ||
} | ||
self.exporter.shutdown() | ||
} | ||
|
||
fn set_resource(&mut self, resource: &Resource) { | ||
if let Ok(mut exporter) = self.exporter.lock() { | ||
exporter.set_resource(resource); | ||
} | ||
self.exporter.set_resource(resource); | ||
} | ||
|
||
#[cfg(feature = "spec_unstable_logs_enabled")] | ||
|
@@ -140,11 +122,7 @@ | |
target: &str, | ||
name: Option<&str>, | ||
) -> bool { | ||
if let Ok(exporter) = self.exporter.lock() { | ||
exporter.event_enabled(level, target, name) | ||
} else { | ||
true | ||
} | ||
self.exporter.event_enabled(level, target, name) | ||
} | ||
} | ||
|
||
|
@@ -231,13 +209,11 @@ | |
|
||
processor.shutdown().unwrap(); | ||
|
||
let is_shutdown = processor | ||
.is_shutdown | ||
.load(std::sync::atomic::Ordering::Relaxed); | ||
assert!(is_shutdown); | ||
|
||
processor.emit(&mut record, &instrumentation); | ||
|
||
// Emit was called after shutdown. While SimpleLogProcessor | ||
// does not care, the exporter in this case does, | ||
// and it ignores the export() calls after shutdown. | ||
assert_eq!(1, exporter.get_emitted_logs().unwrap().len()); | ||
assert!(exporter.is_shutdown_called()); | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.