Skip to content

Commit 06fa8ae

Browse files
committed
fix(async-processor): concurrent exports actually serialised
1 parent d4eb35a commit 06fa8ae

File tree

2 files changed

+159
-35
lines changed

2 files changed

+159
-35
lines changed

opentelemetry-sdk/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ spec_unstable_metrics_views = ["metrics"]
5555
experimental_metrics_custom_reader = ["metrics"]
5656
experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"]
5757
experimental_logs_concurrent_log_processor = ["logs"]
58-
experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"]
58+
experimental_trace_batch_span_processor_with_async_runtime = ["rt-tokio", "trace", "experimental_async_runtime"]
5959
experimental_metrics_disable_name_validation = ["metrics"]
6060

6161
[[bench]]

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 158 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,21 @@ use crate::trace::Span;
66
use crate::trace::SpanProcessor;
77
use crate::trace::{SpanData, SpanExporter};
88
use futures_channel::oneshot;
9-
use futures_util::pin_mut;
109
use futures_util::{
1110
future::{self, BoxFuture, Either},
12-
select,
11+
pin_mut, select,
1312
stream::{self, FusedStream, FuturesUnordered},
1413
StreamExt as _,
1514
};
1615
use opentelemetry::Context;
1716
use opentelemetry::{otel_debug, otel_error, otel_warn};
1817
use std::fmt;
19-
use std::sync::atomic::{AtomicUsize, Ordering};
20-
use std::sync::Arc;
18+
use std::sync::{
19+
atomic::{AtomicUsize, Ordering},
20+
Arc,
21+
};
2122
use std::time::Duration;
23+
use tokio::sync::RwLock;
2224

2325
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2426
/// them at a preconfigured interval.
@@ -188,13 +190,19 @@ struct BatchSpanProcessorInternal<E, R> {
188190
spans: Vec<SpanData>,
189191
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
190192
runtime: R,
191-
exporter: E,
193+
exporter: Arc<RwLock<E>>,
192194
config: BatchConfig,
193195
}
194196

195-
impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
197+
impl<E: SpanExporter + Send + Sync + 'static, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
196198
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
197-
let export_result = self.export().await;
199+
let export_result = Self::export(
200+
self.spans.split_off(0),
201+
self.exporter.clone(),
202+
self.runtime.clone(),
203+
self.config.max_export_timeout,
204+
)
205+
.await;
198206
let task = Box::pin(async move {
199207
if let Some(channel) = res_channel {
200208
// If a response channel is provided, attempt to send the export result through it.
@@ -243,9 +251,15 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
243251
self.export_tasks.next().await;
244252
}
245253

246-
let export_result = self.export().await;
254+
let batch = self.spans.split_off(0);
255+
let exporter = self.exporter.clone();
256+
let runtime = self.runtime.clone();
257+
let max_export_timeout = self.config.max_export_timeout;
258+
247259
let task = async move {
248-
if let Err(err) = export_result {
260+
if let Err(err) =
261+
Self::export(batch, exporter, runtime, max_export_timeout).await
262+
{
249263
otel_error!(
250264
name: "BatchSpanProcessor.Export.Error",
251265
reason = format!("{}", err)
@@ -254,6 +268,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
254268

255269
Ok(())
256270
};
271+
257272
// Special case when not using concurrent exports
258273
if self.config.max_concurrent_exports == 1 {
259274
let _ = task.await;
@@ -288,34 +303,39 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
288303
// Stream has terminated or processor is shutdown, return to finish execution.
289304
BatchMessage::Shutdown(ch) => {
290305
self.flush(Some(ch)).await;
291-
let _ = self.exporter.shutdown();
306+
let _ = self.exporter.write().await.shutdown();
292307
return false;
293308
}
294309
// propagate the resource
295310
BatchMessage::SetResource(resource) => {
296-
self.exporter.set_resource(&resource);
311+
self.exporter.write().await.set_resource(&resource);
297312
}
298313
}
299314
true
300315
}
301316

302-
async fn export(&mut self) -> OTelSdkResult {
317+
async fn export(
318+
batch: Vec<SpanData>,
319+
exporter: Arc<RwLock<E>>,
320+
runtime: R,
321+
max_export_timeout: Duration,
322+
) -> OTelSdkResult {
303323
// Batch size check for flush / shutdown. Those methods may be called
304324
// when there's no work to do.
305-
if self.spans.is_empty() {
325+
if batch.is_empty() {
306326
return Ok(());
307327
}
308328

309-
let export = self.exporter.export(self.spans.split_off(0));
310-
let timeout = self.runtime.delay(self.config.max_export_timeout);
311-
let time_out = self.config.max_export_timeout;
329+
let exporter_guard = exporter.read().await;
330+
let export = exporter_guard.export(batch);
331+
let timeout = runtime.delay(max_export_timeout);
312332

313333
pin_mut!(export);
314334
pin_mut!(timeout);
315335

316336
match future::select(export, timeout).await {
317337
Either::Left((export_res, _)) => export_res,
318-
Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
338+
Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)),
319339
}
320340
}
321341

@@ -368,7 +388,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
368388
export_tasks: FuturesUnordered::new(),
369389
runtime: timeout_runtime,
370390
config,
371-
exporter,
391+
exporter: Arc::new(RwLock::new(exporter)),
372392
};
373393

374394
processor.run(messages).await
@@ -435,6 +455,8 @@ mod tests {
435455
use crate::trace::{SpanData, SpanExporter};
436456
use futures_util::Future;
437457
use std::fmt::Debug;
458+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
459+
use std::sync::Arc;
438460
use std::time::Duration;
439461

440462
struct BlockingExporter<D> {
@@ -463,6 +485,39 @@ mod tests {
463485
}
464486
}
465487

488+
/// Exporter that records whether two exports overlap in time.
489+
struct TrackingExporter {
490+
/// Artificial delay to keep each export alive for a while.
491+
delay: Duration,
492+
/// Current number of in-flight exports.
493+
active: Arc<AtomicUsize>,
494+
/// Set to true the first time we see overlap.
495+
concurrent_seen: Arc<AtomicBool>,
496+
}
497+
498+
impl Debug for TrackingExporter {
499+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
500+
f.write_str("tracking exporter")
501+
}
502+
}
503+
504+
impl SpanExporter for TrackingExporter {
505+
async fn export(&self, _batch: Vec<SpanData>) -> crate::error::OTelSdkResult {
506+
// Increment in-flight counter and note any overlap.
507+
let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1;
508+
if inflight > 1 {
509+
self.concurrent_seen.store(true, Ordering::SeqCst);
510+
}
511+
512+
// Keep the export "busy" for a bit.
513+
tokio::time::sleep(self.delay).await;
514+
515+
// Decrement counter.
516+
self.active.fetch_sub(1, Ordering::SeqCst);
517+
Ok(())
518+
}
519+
}
520+
466521
#[test]
467522
fn test_build_batch_span_processor_builder() {
468523
let mut env_vars = vec![
@@ -532,8 +587,8 @@ mod tests {
532587
);
533588
}
534589

535-
// If the time_out is true, then the result suppose to ended with timeout.
536-
// otherwise the exporter should be able to export within time out duration.
590+
// If `time_out` is `true`, then the export should fail with a timeout.
591+
// Else, the exporter should be able to export within the timeout duration.
537592
async fn timeout_test_tokio(time_out: bool) {
538593
let config = BatchConfig {
539594
max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
@@ -554,27 +609,96 @@ mod tests {
554609
assert!(flush_res.is_ok());
555610
}
556611
let shutdown_res = processor.shutdown();
612+
println!("Shutdown result: {:?}", shutdown_res);
557613
assert!(shutdown_res.is_ok());
558614
}
559615

560-
#[test]
561-
fn test_timeout_tokio_timeout() {
616+
#[tokio::test(flavor = "multi_thread")]
617+
async fn test_timeout_tokio_timeout() {
562618
// If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
563619
// If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
564620
// Either way, the test should be finished within 5s.
565-
let runtime = tokio::runtime::Builder::new_multi_thread()
566-
.enable_all()
567-
.build()
568-
.unwrap();
569-
runtime.block_on(timeout_test_tokio(true));
621+
timeout_test_tokio(true).await;
570622
}
571623

572-
#[test]
573-
fn test_timeout_tokio_not_timeout() {
574-
let runtime = tokio::runtime::Builder::new_multi_thread()
575-
.enable_all()
576-
.build()
577-
.unwrap();
578-
runtime.block_on(timeout_test_tokio(false));
624+
#[tokio::test(flavor = "multi_thread")]
625+
async fn test_timeout_tokio_not_timeout() {
626+
timeout_test_tokio(false).await;
627+
}
628+
629+
#[tokio::test(flavor = "multi_thread")]
630+
async fn test_concurrent_exports_expected() {
631+
// Shared state for the exporter.
632+
let active = Arc::new(AtomicUsize::new(0));
633+
let concurrent_seen = Arc::new(AtomicBool::new(false));
634+
635+
let exporter = TrackingExporter {
636+
delay: Duration::from_millis(50),
637+
active: active.clone(),
638+
concurrent_seen: concurrent_seen.clone(),
639+
};
640+
641+
// Intentionally tiny batch-size so every span forces an export.
642+
let config = BatchConfig {
643+
max_export_batch_size: 1,
644+
max_queue_size: 16,
645+
scheduled_delay: Duration::from_secs(3600), // effectively disabled
646+
max_export_timeout: Duration::from_secs(5),
647+
max_concurrent_exports: 2, // what we want to verify
648+
};
649+
650+
// Spawn the processor.
651+
let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
652+
653+
// Finish three spans in rapid succession.
654+
processor.on_end(new_test_export_span_data());
655+
processor.on_end(new_test_export_span_data());
656+
processor.on_end(new_test_export_span_data());
657+
658+
// Wait until everything has been exported.
659+
processor.force_flush().expect("force flush failed");
660+
processor.shutdown().expect("shutdown failed");
661+
662+
// Expect at least one period with >1 export in flight.
663+
assert!(
664+
concurrent_seen.load(Ordering::SeqCst),
665+
"exports never overlapped, processor is still serialising them"
666+
);
667+
}
668+
669+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
670+
async fn test_exports_serial_when_max_concurrent_exports_1() {
671+
let active = Arc::new(AtomicUsize::new(0));
672+
let concurrent_seen = Arc::new(AtomicBool::new(false));
673+
674+
let exporter = TrackingExporter {
675+
delay: Duration::from_millis(50),
676+
active: active.clone(),
677+
concurrent_seen: concurrent_seen.clone(),
678+
};
679+
680+
let config = BatchConfig {
681+
max_export_batch_size: 1,
682+
max_queue_size: 16,
683+
scheduled_delay: Duration::from_secs(3600),
684+
max_export_timeout: Duration::from_secs(5),
685+
max_concurrent_exports: 1, // what we want to verify
686+
};
687+
688+
let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
689+
690+
// Finish several spans quickly.
691+
processor.on_end(new_test_export_span_data());
692+
processor.on_end(new_test_export_span_data());
693+
processor.on_end(new_test_export_span_data());
694+
695+
processor.force_flush().expect("force flush failed");
696+
processor.shutdown().expect("shutdown failed");
697+
698+
// There must never have been more than one export in flight.
699+
assert!(
700+
!concurrent_seen.load(Ordering::SeqCst),
701+
"exports overlapped even though max_concurrent_exports was 1"
702+
);
579703
}
580704
}

0 commit comments

Comments
 (0)