Skip to content

Commit acf16ed

Browse files
authored
Tweaks to logs integration test (#2453)
1 parent 0a7ad69 commit acf16ed

File tree

6 files changed

+124
-108
lines changed

6 files changed

+124
-108
lines changed

opentelemetry-otlp/tests/integration_test/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,17 @@ publish = false
88
opentelemetry = { path = "../../../opentelemetry", features = [] }
99
opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "testing"] }
1010
opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "metrics", "with-serde"] }
11-
log = { workspace = true }
1211
tokio = { version = "1.0", features = ["full"] }
1312
serde_json = "1"
1413
testcontainers = { version = "0.23.1", features = ["http_wait"]}
1514
once_cell.workspace = true
1615
anyhow = "1.0.94"
1716
ctor = "0.2.9"
1817
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }
19-
tracing = "0.1.41"
18+
tracing = {workspace = true}
2019

2120
[target.'cfg(unix)'.dependencies]
22-
opentelemetry-appender-log = { path = "../../../opentelemetry-appender-log", default-features = false}
21+
opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false}
2322
opentelemetry-otlp = { path = "../../../opentelemetry-otlp", default-features = false }
2423
opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" }
2524

opentelemetry-otlp/tests/integration_test/expected/failed_logs.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
"scopeLogs": [
1515
{
1616
"scope": {
17-
"name": "opentelemetry-log-appender",
18-
"version": "0.3.0"
17+
"name": "my-target",
18+
"version": ""
1919
},
2020
"logRecords": [
2121
{

opentelemetry-otlp/tests/integration_test/expected/logs.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
"scopeLogs": [
1515
{
1616
"scope": {
17-
"name": "opentelemetry-log-appender",
18-
"version": "0.3.0"
17+
"name": "my-target",
18+
"version": ""
1919
},
2020
"logRecords": [
2121
{

opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ impl LogsAsserter {
4343
let result_scope_logs = &result_resource_logs.scope_logs[i];
4444
let expected_scope_logs = &expected_resource_logs.scope_logs[i];
4545

46+
assert_eq!(result_scope_logs.scope, expected_scope_logs.scope);
47+
4648
results_logs.extend(result_scope_logs.log_records.clone());
4749
expected_logs.extend(expected_scope_logs.log_records.clone());
4850
}

opentelemetry-otlp/tests/integration_test/tests/logs.rs

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@ use anyhow::Result;
44
use ctor::dtor;
55
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
66
use integration_test_runner::test_utils;
7-
use log::{info, Level};
8-
use opentelemetry_appender_log::OpenTelemetryLogBridge;
97
use opentelemetry_otlp::LogExporter;
108
use opentelemetry_sdk::logs::LoggerProvider;
119
use opentelemetry_sdk::{logs as sdklogs, Resource};
1210
use std::fs::File;
1311
use std::os::unix::fs::MetadataExt;
14-
use std::time::Duration;
1512

1613
fn init_logs() -> Result<sdklogs::LoggerProvider> {
1714
let exporter_builder = LogExporter::builder();
@@ -37,27 +34,56 @@ fn init_logs() -> Result<sdklogs::LoggerProvider> {
3734
.build())
3835
}
3936

40-
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
41-
pub async fn test_logs() -> Result<()> {
42-
// Make sure the container is running
43-
test_utils::start_collector_container().await?;
44-
45-
let logger_provider = init_logs().unwrap();
46-
let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
47-
log::set_boxed_logger(Box::new(otel_log_appender))?;
48-
log::set_max_level(Level::Info.to_level_filter());
49-
50-
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
51-
52-
// TODO: remove below wait before calling logger_provider.shutdown()
53-
tokio::time::sleep(Duration::from_secs(10)).await;
54-
let _ = logger_provider.shutdown();
55-
56-
tokio::time::sleep(Duration::from_secs(10)).await;
57-
58-
assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");
59-
60-
Ok(())
37+
#[cfg(test)]
38+
mod logtests {
39+
use super::*;
40+
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
41+
use std::{fs::File, time::Duration};
42+
#[test]
43+
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
44+
pub fn test_assert_logs_eq_failure() {
45+
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
46+
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
47+
LogsAsserter::new(right, left).assert();
48+
}
49+
50+
#[test]
51+
pub fn test_assert_logs_eq() {
52+
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
53+
LogsAsserter::new(logs.clone(), logs).assert();
54+
}
55+
56+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
57+
#[cfg(not(feature = "hyper-client"))]
58+
#[cfg(not(feature = "reqwest-client"))]
59+
pub async fn test_logs() -> Result<()> {
60+
// Make sure the container is running
61+
62+
use integration_test_runner::test_utils;
63+
use opentelemetry_appender_tracing::layer;
64+
use tracing::info;
65+
use tracing_subscriber::layer::SubscriberExt;
66+
67+
use crate::{assert_logs_results, init_logs};
68+
test_utils::start_collector_container().await?;
69+
70+
let logger_provider = init_logs().unwrap();
71+
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
72+
let subscriber = tracing_subscriber::registry().with(layer);
73+
{
74+
let _guard = tracing::subscriber::set_default(subscriber);
75+
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
76+
}
77+
// TODO: remove below wait before calling logger_provider.shutdown()
78+
// tokio::time::sleep(Duration::from_secs(10)).await;
79+
let _ = logger_provider.shutdown();
80+
81+
tokio::time::sleep(Duration::from_secs(10)).await;
82+
83+
assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");
84+
85+
Ok(())
86+
}
6187
}
6288

6389
pub fn assert_logs_results(result: &str, expected: &str) {
@@ -69,20 +95,6 @@ pub fn assert_logs_results(result: &str, expected: &str) {
6995
assert!(File::open(result).unwrap().metadata().unwrap().size() > 0)
7096
}
7197

72-
#[test]
73-
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
74-
pub fn test_assert_logs_eq_failure() {
75-
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
76-
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
77-
LogsAsserter::new(right, left).assert();
78-
}
79-
80-
#[test]
81-
pub fn test_assert_logs_eq() {
82-
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
83-
LogsAsserter::new(logs.clone(), logs).assert();
84-
}
85-
8698
///
8799
/// Make sure we stop the collector container, otherwise it will sit around hogging our
88100
/// ports and subsequent test runs will fail.

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 66 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -321,78 +321,81 @@ impl BatchLogProcessor {
321321
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
322322
let max_queue_size = config.max_queue_size;
323323

324-
let handle = thread::spawn(move || {
325-
let mut last_export_time = Instant::now();
326-
let mut logs = Vec::new();
327-
logs.reserve(config.max_export_batch_size);
328-
329-
loop {
330-
let remaining_time_option = config
331-
.scheduled_delay
332-
.checked_sub(last_export_time.elapsed());
333-
let remaining_time = match remaining_time_option {
334-
Some(remaining_time) => remaining_time,
335-
None => config.scheduled_delay,
336-
};
337-
338-
match message_receiver.recv_timeout(remaining_time) {
339-
Ok(BatchMessage::ExportLog(log)) => {
340-
logs.push(log);
341-
if logs.len() == config.max_export_batch_size
342-
|| last_export_time.elapsed() >= config.scheduled_delay
343-
{
344-
let _ = export_with_timeout_sync(
324+
let handle = thread::Builder::new()
325+
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
326+
.spawn(move || {
327+
let mut last_export_time = Instant::now();
328+
let mut logs = Vec::new();
329+
logs.reserve(config.max_export_batch_size);
330+
331+
loop {
332+
let remaining_time_option = config
333+
.scheduled_delay
334+
.checked_sub(last_export_time.elapsed());
335+
let remaining_time = match remaining_time_option {
336+
Some(remaining_time) => remaining_time,
337+
None => config.scheduled_delay,
338+
};
339+
340+
match message_receiver.recv_timeout(remaining_time) {
341+
Ok(BatchMessage::ExportLog(log)) => {
342+
logs.push(log);
343+
if logs.len() == config.max_export_batch_size
344+
|| last_export_time.elapsed() >= config.scheduled_delay
345+
{
346+
let _ = export_with_timeout_sync(
347+
remaining_time,
348+
exporter.as_mut(),
349+
logs.split_off(0),
350+
&mut last_export_time,
351+
);
352+
}
353+
}
354+
Ok(BatchMessage::ForceFlush(sender)) => {
355+
let result = export_with_timeout_sync(
345356
remaining_time,
346357
exporter.as_mut(),
347358
logs.split_off(0),
348359
&mut last_export_time,
349360
);
361+
let _ = sender.send(result);
350362
}
351-
}
352-
Ok(BatchMessage::ForceFlush(sender)) => {
353-
let result = export_with_timeout_sync(
354-
remaining_time,
355-
exporter.as_mut(),
356-
logs.split_off(0),
357-
&mut last_export_time,
358-
);
359-
let _ = sender.send(result);
360-
}
361-
Ok(BatchMessage::Shutdown(sender)) => {
362-
let result = export_with_timeout_sync(
363-
remaining_time,
364-
exporter.as_mut(),
365-
logs.split_off(0),
366-
&mut last_export_time,
367-
);
368-
let _ = sender.send(result);
363+
Ok(BatchMessage::Shutdown(sender)) => {
364+
let result = export_with_timeout_sync(
365+
remaining_time,
366+
exporter.as_mut(),
367+
logs.split_off(0),
368+
&mut last_export_time,
369+
);
370+
let _ = sender.send(result);
369371

370-
//
371-
// break out the loop and return from the current background thread.
372-
//
373-
break;
374-
}
375-
Ok(BatchMessage::SetResource(resource)) => {
376-
exporter.set_resource(&resource);
377-
}
378-
Err(RecvTimeoutError::Timeout) => {
379-
let _ = export_with_timeout_sync(
380-
remaining_time,
381-
exporter.as_mut(),
382-
logs.split_off(0),
383-
&mut last_export_time,
384-
);
385-
}
386-
Err(err) => {
387-
// TODO: this should not happen! Log the error and continue for now.
388-
otel_error!(
389-
name: "BatchLogProcessor.InternalError",
390-
error = format!("{}", err)
391-
);
372+
//
373+
// break out the loop and return from the current background thread.
374+
//
375+
break;
376+
}
377+
Ok(BatchMessage::SetResource(resource)) => {
378+
exporter.set_resource(&resource);
379+
}
380+
Err(RecvTimeoutError::Timeout) => {
381+
let _ = export_with_timeout_sync(
382+
remaining_time,
383+
exporter.as_mut(),
384+
logs.split_off(0),
385+
&mut last_export_time,
386+
);
387+
}
388+
Err(err) => {
389+
// TODO: this should not happen! Log the error and continue for now.
390+
otel_error!(
391+
name: "BatchLogProcessor.InternalError",
392+
error = format!("{}", err)
393+
);
394+
}
392395
}
393396
}
394-
}
395-
});
397+
})
398+
.expect("Thread spawn failed."); //TODO: Handle thread spawn failure
396399

397400
// Return batch processor with link to worker
398401
BatchLogProcessor {

0 commit comments

Comments
 (0)