Skip to content

chore: bring back BatchLogProcessorWithAsyncRuntime unit tests #2457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1580,4 +1580,176 @@

assert_eq!(exporter.len(), 1);
}

#[test]
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
fn test_build_batch_log_processor_builder_rt() {
let mut env_vars = vec![
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
);

assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
});

env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
}

#[test]
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
fn test_build_batch_log_processor_builder_rt_with_custom_config() {
let expected = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4)
.build();

let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
)
.with_batch_config(expected);

let actual = &builder.config;
assert_eq!(actual.max_export_batch_size, 1);
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
assert_eq!(actual.max_queue_size, 4);
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_set_resource_batch_processor_rt() {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
let provider = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new("k1", "v1"),
KeyValue::new("k2", "v3"),
KeyValue::new("k3", "v3"),
KeyValue::new("k4", "v4"),
KeyValue::new("k5", "v5"),
]))
.build();
tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking?
assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
let _ = provider.shutdown();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_shutdown_rt() {
// assert we will receive an error
// setup
let exporter = InMemoryLogExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

let mut record = LogRecord::default();
let instrumentation = InstrumentationScope::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();
// todo: expect to see errors here. How should we assert this?
processor.emit(&mut record, &instrumentation);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

//
// deadloack happens in shutdown with tokio current_thread runtime
//
processor.shutdown().unwrap();
}

Check warning on line 1714 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L1702-L1714

Added lines #L1702 - L1714 were not covered by tests

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
{
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

processor.shutdown().unwrap();
}
}
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::{Arc, Mutex};
/// let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
/// //Create a LoggerProvider and register the exporter
/// let logger_provider = LoggerProvider::builder()
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
/// .build();
/// // Setup Log Appenders and emit logs. (Not shown here)
/// logger_provider.force_flush();
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct LogDataWithResource {
/// let exporter: InMemoryLogExporter = InMemoryLogExporterBuilder::default().build();
/// //Create a LoggerProvider and register the exporter
/// let logger_provider = LoggerProvider::builder()
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
/// .build();
/// // Setup Log Appenders and emit logs. (Not shown here)
/// logger_provider.force_flush();
Expand Down
Loading