Skip to content

Tweaks to logs integration test #2453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions opentelemetry-otlp/tests/integration_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ publish = false
opentelemetry = { path = "../../../opentelemetry", features = [] }
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "testing"] }
opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "metrics", "with-serde"] }
log = { workspace = true }
tokio = { version = "1.0", features = ["full"] }
serde_json = "1"
testcontainers = { version = "0.23.1", features = ["http_wait"]}
once_cell.workspace = true
anyhow = "1.0.94"
ctor = "0.2.9"
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }
tracing = "0.1.41"
tracing = {workspace = true}

[target.'cfg(unix)'.dependencies]
opentelemetry-appender-log = { path = "../../../opentelemetry-appender-log", default-features = false}
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
opentelemetry-otlp = { path = "../../../opentelemetry-otlp", default-features = false }
opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"scopeLogs": [
{
"scope": {
"name": "opentelemetry-log-appender",
"version": "0.3.0"
"name": "my-target",
"version": ""
},
"logRecords": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"scopeLogs": [
{
"scope": {
"name": "opentelemetry-log-appender",
"version": "0.3.0"
"name": "my-target",
"version": ""
},
"logRecords": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ impl LogsAsserter {
let result_scope_logs = &result_resource_logs.scope_logs[i];
let expected_scope_logs = &expected_resource_logs.scope_logs[i];

assert_eq!(result_scope_logs.scope, expected_scope_logs.scope);

results_logs.extend(result_scope_logs.log_records.clone());
expected_logs.extend(expected_scope_logs.log_records.clone());
}
Expand Down
88 changes: 50 additions & 38 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ use anyhow::Result;
use ctor::dtor;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use log::{info, Level};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::{logs as sdklogs, Resource};
use std::fs::File;
use std::os::unix::fs::MetadataExt;
use std::time::Duration;

fn init_logs() -> Result<sdklogs::LoggerProvider> {
let exporter_builder = LogExporter::builder();
Expand All @@ -37,27 +34,56 @@ fn init_logs() -> Result<sdklogs::LoggerProvider> {
.build())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
pub async fn test_logs() -> Result<()> {
// Make sure the container is running
test_utils::start_collector_container().await?;

let logger_provider = init_logs().unwrap();
let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
log::set_boxed_logger(Box::new(otel_log_appender))?;
log::set_max_level(Level::Info.to_level_filter());

info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);

// TODO: remove below wait before calling logger_provider.shutdown()
tokio::time::sleep(Duration::from_secs(10)).await;
let _ = logger_provider.shutdown();

tokio::time::sleep(Duration::from_secs(10)).await;

assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");

Ok(())
#[cfg(test)]
mod logtests {
use super::*;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use std::{fs::File, time::Duration};
#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
pub fn test_assert_logs_eq_failure() {
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
LogsAsserter::new(right, left).assert();
}

#[test]
pub fn test_assert_logs_eq() {
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
LogsAsserter::new(logs.clone(), logs).assert();
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(not(feature = "hyper-client"))]
#[cfg(not(feature = "reqwest-client"))]
pub async fn test_logs() -> Result<()> {
// Make sure the container is running

use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;

use crate::{assert_logs_results, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs().unwrap();
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
}
// TODO: remove below wait before calling logger_provider.shutdown()
// tokio::time::sleep(Duration::from_secs(10)).await;
Copy link
Preview

Copilot AI Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out code should be removed or properly handled. Leaving it commented out with a 'TODO' is not ideal.

Suggested change
// tokio::time::sleep(Duration::from_secs(10)).await;
// let _ = logger_provider.shutdown();

Copilot uses AI. Check for mistakes.

let _ = logger_provider.shutdown();

tokio::time::sleep(Duration::from_secs(10)).await;

assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");

Ok(())
}
}

pub fn assert_logs_results(result: &str, expected: &str) {
Expand All @@ -69,20 +95,6 @@ pub fn assert_logs_results(result: &str, expected: &str) {
assert!(File::open(result).unwrap().metadata().unwrap().size() > 0)
}

#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
pub fn test_assert_logs_eq_failure() {
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
LogsAsserter::new(right, left).assert();
}

#[test]
pub fn test_assert_logs_eq() {
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
LogsAsserter::new(logs.clone(), logs).assert();
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
129 changes: 66 additions & 63 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,78 +321,81 @@
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
let max_queue_size = config.max_queue_size;

let handle = thread::spawn(move || {
let mut last_export_time = Instant::now();
let mut logs = Vec::new();
logs.reserve(config.max_export_batch_size);

loop {
let remaining_time_option = config
.scheduled_delay
.checked_sub(last_export_time.elapsed());
let remaining_time = match remaining_time_option {
Some(remaining_time) => remaining_time,
None => config.scheduled_delay,
};

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(log)) => {
logs.push(log);
if logs.len() == config.max_export_batch_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
let _ = export_with_timeout_sync(
let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
.spawn(move || {
let mut last_export_time = Instant::now();
let mut logs = Vec::new();
logs.reserve(config.max_export_batch_size);

loop {
let remaining_time_option = config
.scheduled_delay
.checked_sub(last_export_time.elapsed());
let remaining_time = match remaining_time_option {
Some(remaining_time) => remaining_time,
None => config.scheduled_delay,

Check warning on line 337 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L337

Added line #L337 was not covered by tests
};

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(log)) => {
logs.push(log);
if logs.len() == config.max_export_batch_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);

Check warning on line 351 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L345-L351

Added lines #L345 - L351 were not covered by tests
}
}
Ok(BatchMessage::ForceFlush(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);
}
}
Ok(BatchMessage::ForceFlush(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);
}
Ok(BatchMessage::Shutdown(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);
Ok(BatchMessage::Shutdown(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);

//
// break out the loop and return from the current background thread.
//
break;
}
Ok(BatchMessage::SetResource(resource)) => {
exporter.set_resource(&resource);
}
Err(RecvTimeoutError::Timeout) => {
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
}
Err(err) => {
// TODO: this should not happen! Log the error and continue for now.
otel_error!(
name: "BatchLogProcessor.InternalError",
error = format!("{}", err)
);
//
// break out the loop and return from the current background thread.
//
break;
}
Ok(BatchMessage::SetResource(resource)) => {
exporter.set_resource(&resource);
}
Err(RecvTimeoutError::Timeout) => {
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
logs.split_off(0),
&mut last_export_time,
);
}
Err(err) => {
// TODO: this should not happen! Log the error and continue for now.
otel_error!(

Check warning on line 390 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L380-L390

Added lines #L380 - L390 were not covered by tests
name: "BatchLogProcessor.InternalError",
error = format!("{}", err)

Check warning on line 392 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L392

Added line #L392 was not covered by tests
);
}
}
}
}
});
})
.expect("Thread spawn failed."); //TODO: Handle thread spawn failure

// Return batch processor with link to worker
BatchLogProcessor {
Expand Down
Loading