Skip to content

fix(async-processor): concurrent exports actually serialised #3028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- TODO: Placeholder for Span processor related things
- *Fix* SpanProcessor::on_start is no longer called on non recording spans
- **Fix**: Restore true parallel exports in the async-native `BatchSpanProcessor` by honoring `OTEL_BSP_MAX_CONCURRENT_EXPORTS` ([#2959](https://github.com/open-telemetry/opentelemetry-rust/pull/3028)). A regression in [#2685](https://github.com/open-telemetry/opentelemetry-rust/pull/2685) inadvertently awaited the `export()` future directly in `opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs` instead of spawning it on the runtime, forcing all exports to run sequentially.

## 0.30.0

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ serde = { workspace = true, features = ["derive", "rc"], optional = true }
serde_json = { workspace = true, optional = true }
thiserror = { workspace = true }
url = { workspace = true, optional = true }
tokio = { workspace = true, features = ["rt", "time"], optional = true }
tokio = { workspace = true, default-features = false, optional = true }
tokio-stream = { workspace = true, optional = true }
http = { workspace = true, optional = true }

Expand All @@ -47,15 +47,15 @@ spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"
metrics = ["opentelemetry/metrics"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
experimental_async_runtime = []
rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"]
rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
internal-logs = ["opentelemetry/internal-logs"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"]
spec_unstable_metrics_views = ["metrics"]
experimental_metrics_custom_reader = ["metrics"]
experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"]
experimental_logs_concurrent_log_processor = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"]
experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"]
experimental_metrics_disable_name_validation = ["metrics"]

[[bench]]
Expand Down
194 changes: 160 additions & 34 deletions opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
use crate::trace::SpanProcessor;
use crate::trace::{SpanData, SpanExporter};
use futures_channel::oneshot;
use futures_util::pin_mut;
use futures_util::{
future::{self, BoxFuture, Either},
select,
pin_mut, select,
stream::{self, FusedStream, FuturesUnordered},
StreamExt as _,
};
use opentelemetry::Context;
use opentelemetry::{otel_debug, otel_error, otel_warn};
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;
use tokio::sync::RwLock;

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
Expand Down Expand Up @@ -188,13 +190,22 @@
spans: Vec<SpanData>,
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
runtime: R,
exporter: E,
config: BatchConfig,
// TODO: Redesign the `SpanExporter` trait to use immutable references (`&self`)
// for all methods. This would allow us to remove the `RwLock` and just use `Arc<E>`,
// similar to how `crate::logs::LogExporter` is implemented.
exporter: Arc<RwLock<E>>,
}

impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
impl<E: SpanExporter + 'static, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
let export_result = self.export().await;
let export_result = Self::export(
self.spans.split_off(0),
self.exporter.clone(),
self.runtime.clone(),
self.config.max_export_timeout,
)
.await;
let task = Box::pin(async move {
if let Some(channel) = res_channel {
// If a response channel is provided, attempt to send the export result through it.
Expand Down Expand Up @@ -243,9 +254,15 @@
self.export_tasks.next().await;
}

let export_result = self.export().await;
let batch = self.spans.split_off(0);
let exporter = self.exporter.clone();
let runtime = self.runtime.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these clones should be cheap - Arc cloning for exporter, and ZST cloning for runtime.

let max_export_timeout = self.config.max_export_timeout;

let task = async move {
if let Err(err) = export_result {
if let Err(err) =

Check warning on line 263 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L263

Added line #L263 was not covered by tests
Self::export(batch, exporter, runtime, max_export_timeout).await
{
otel_error!(
name: "BatchSpanProcessor.Export.Error",
reason = format!("{}", err)
Expand All @@ -254,6 +271,7 @@

Ok(())
};

// Special case when not using concurrent exports
if self.config.max_concurrent_exports == 1 {
let _ = task.await;
Expand Down Expand Up @@ -288,34 +306,39 @@
// Stream has terminated or processor is shutdown, return to finish execution.
BatchMessage::Shutdown(ch) => {
self.flush(Some(ch)).await;
let _ = self.exporter.shutdown();
let _ = self.exporter.write().await.shutdown();
return false;
}
// propagate the resource
BatchMessage::SetResource(resource) => {
self.exporter.set_resource(&resource);
self.exporter.write().await.set_resource(&resource);

Check warning on line 314 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L314

Added line #L314 was not covered by tests
}
}
true
}

async fn export(&mut self) -> OTelSdkResult {
async fn export(
batch: Vec<SpanData>,
exporter: Arc<RwLock<E>>,
runtime: R,
max_export_timeout: Duration,
) -> OTelSdkResult {
// Batch size check for flush / shutdown. Those methods may be called
// when there's no work to do.
if self.spans.is_empty() {
if batch.is_empty() {
return Ok(());
}

let export = self.exporter.export(self.spans.split_off(0));
let timeout = self.runtime.delay(self.config.max_export_timeout);
let time_out = self.config.max_export_timeout;
let exporter_guard = exporter.read().await;
let export = exporter_guard.export(batch);
let timeout = runtime.delay(max_export_timeout);

pin_mut!(export);
pin_mut!(timeout);

match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)),
}
}

Expand Down Expand Up @@ -368,7 +391,7 @@
export_tasks: FuturesUnordered::new(),
runtime: timeout_runtime,
config,
exporter,
exporter: Arc::new(RwLock::new(exporter)),
};

processor.run(messages).await
Expand Down Expand Up @@ -435,6 +458,8 @@
use crate::trace::{SpanData, SpanExporter};
use futures_util::Future;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

struct BlockingExporter<D> {
Expand Down Expand Up @@ -463,6 +488,39 @@
}
}

/// Exporter that records whether two exports overlap in time.
struct TrackingExporter {
/// Artificial delay to keep each export alive for a while.
delay: Duration,
/// Current number of in-flight exports.
active: Arc<AtomicUsize>,
/// Set to true the first time we see overlap.
concurrent_seen: Arc<AtomicBool>,
}

impl Debug for TrackingExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("tracking exporter")
}

Check warning on line 504 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L502-L504

Added lines #L502 - L504 were not covered by tests
}

impl SpanExporter for TrackingExporter {
async fn export(&self, _batch: Vec<SpanData>) -> crate::error::OTelSdkResult {
// Increment in-flight counter and note any overlap.
let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1;
if inflight > 1 {
self.concurrent_seen.store(true, Ordering::SeqCst);
}

// Keep the export "busy" for a bit.
tokio::time::sleep(self.delay).await;

// Decrement counter.
self.active.fetch_sub(1, Ordering::SeqCst);
Ok(())
}
}

#[test]
fn test_build_batch_span_processor_builder() {
let mut env_vars = vec![
Expand Down Expand Up @@ -532,8 +590,8 @@
);
}

// If the time_out is true, then the result suppose to ended with timeout.
// otherwise the exporter should be able to export within time out duration.
// If `time_out` is `true`, then the export should fail with a timeout.
// Else, the exporter should be able to export within the timeout duration.
async fn timeout_test_tokio(time_out: bool) {
let config = BatchConfig {
max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
Expand All @@ -557,24 +615,92 @@
assert!(shutdown_res.is_ok());
}

#[test]
fn test_timeout_tokio_timeout() {
#[tokio::test(flavor = "multi_thread")]
async fn test_timeout_tokio_timeout() {
// If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
// If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
// Either way, the test should be finished within 5s.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(timeout_test_tokio(true));
timeout_test_tokio(true).await;
}

#[test]
fn test_timeout_tokio_not_timeout() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(timeout_test_tokio(false));
#[tokio::test(flavor = "multi_thread")]
async fn test_timeout_tokio_not_timeout() {
timeout_test_tokio(false).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_concurrent_exports_expected() {
// Shared state for the exporter.
let active = Arc::new(AtomicUsize::new(0));
let concurrent_seen = Arc::new(AtomicBool::new(false));

let exporter = TrackingExporter {
delay: Duration::from_millis(50),
active: active.clone(),
concurrent_seen: concurrent_seen.clone(),
};

// Intentionally tiny batch-size so every span forces an export.
let config = BatchConfig {
max_export_batch_size: 1,
max_queue_size: 16,
scheduled_delay: Duration::from_secs(3600), // effectively disabled
max_export_timeout: Duration::from_secs(5),
max_concurrent_exports: 2, // what we want to verify
};

// Spawn the processor.
let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);

// Finish three spans in rapid succession.
processor.on_end(new_test_export_span_data());
processor.on_end(new_test_export_span_data());
processor.on_end(new_test_export_span_data());

// Wait until everything has been exported.
processor.force_flush().expect("force flush failed");
processor.shutdown().expect("shutdown failed");

// Expect at least one period with >1 export in flight.
assert!(
concurrent_seen.load(Ordering::SeqCst),
"exports never overlapped, processor is still serialising them"
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_exports_serial_when_max_concurrent_exports_1() {
let active = Arc::new(AtomicUsize::new(0));
let concurrent_seen = Arc::new(AtomicBool::new(false));

let exporter = TrackingExporter {
delay: Duration::from_millis(50),
active: active.clone(),
concurrent_seen: concurrent_seen.clone(),
};

let config = BatchConfig {
max_export_batch_size: 1,
max_queue_size: 16,
scheduled_delay: Duration::from_secs(3600),
max_export_timeout: Duration::from_secs(5),
max_concurrent_exports: 1, // what we want to verify
};

let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);

// Finish several spans quickly.
processor.on_end(new_test_export_span_data());
processor.on_end(new_test_export_span_data());
processor.on_end(new_test_export_span_data());

processor.force_flush().expect("force flush failed");
processor.shutdown().expect("shutdown failed");

// There must never have been more than one export in flight.
assert!(
!concurrent_seen.load(Ordering::SeqCst),
"exports overlapped even though max_concurrent_exports was 1"
);
}
}