Skip to content

Commit 7534891

Browse files
authored
Support concurrent exports (open-telemetry#781)
* Add support for concurrent exports Applications generating significant span volume can end up dropping data due to the synchronous export step. According to the opentelemetry spec, This function will never be called concurrently for the same exporter instance. It can be called again only after the current call returns. However, it does not place a restriction on concurrent I/O or anything of that nature. There is an [ongoing discussion] about tweaking the language to make this more clear. With that in mind, this commit makes the exporters return a future that can be spawned concurrently. Unfortunately, this means that the `export()` method can no longer be async while taking &mut self. The latter is desirable to enforce the no concurrent calls line of the spec, so the choice is made here to return a future instead with the lifetime decoupled from self. This resulted in a bit of additional verbosity, but for the most part the async code can still be shoved into an async fn for the ergonomics. The main exception to this is the `jaeger` exporter which internally requires a bunch of mutable references. I plan to discuss with the opentelemetry team the overall goal of this PR and get buy-in before making more invasive changes to support this in the jaeger exporter. [ongoing discussion]: open-telemetry/opentelemetry-specification#2434 * SpanProcessor directly manages concurrent exports Prior, export tasks were run in "fire and forget" mode with runtime::spawn. SpanProcessor now manages tasks directly using FuturesUnordered. This enables limiting overall concurrency (and thus memory footprint). Additionally, flush and shutdown logic now spawn an additional task for any unexported spans and wait on _all_ outstanding tasks to complete before returning. * Add configuration for BSP max_concurrent_exports Users may desire to control the level of export concurrency in the batch span processor. There are two special values: max_concurrent_exports = 0: no bound on concurrency max_concurrent_exports = 1: no concurrency, makes everything synchronous on the messaging task. * Implement new SpanExporter API for Jaeger Key points - decouple exporter from uploaders via channel and spawned task - some uploaders are a shared I/O resource and cannot be multiplexed - necessitates a task queue - eg, HttpClient will spawn many I/O tasks internally, AgentUploader is a single I/O resource. Different level of abstraction. - Synchronous API not supported without a Runtime argument. I updated the API to thread one through, but maybe this is undesirable. I'm also exploiting the fact in the Actix examples that it uses Tokio under the hood to pass through the Tokio runtime token. - Tests pass save for a couple of flakey environment ones which is likely a race condition. * Reduce dependencies on futures The minimal necessary futures library (core, util, futures proper) is now used in all packages touched by the concurrent exporters work. * Remove runtime from Jaeger's install_simple To keep the API _actually_ simple, we now leverage a thread to run the jaeger exporter internals. * Add Arc lost in a rebase * Fix OTEL_BSP_MAX_CONCURRENT_EXPORTS name and value Per PR feedback, the default should match the previous behavior of 1 batch at a time. * Fix remaining TODOs This finishes the remaining TODOs on the concurrent-exports branch. The major change included here adds shutdown functionality to the jaeger exporter which ensures the exporter has finished its tasks before exiting. * Restore lint.sh script This was erroneously committed. * Make max concurrent exports env configurable OTEL_BSP_MAX_CONCURRENT_EXPORTS may now be specified in the environment to configure the number of max concurrent exports. This configurable now has parity with the other options of the span_processor.
1 parent 02e15b2 commit 7534891

File tree

20 files changed

+523
-258
lines changed

20 files changed

+523
-258
lines changed

opentelemetry-datadog/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ thiserror = "1.0"
3636
itertools = "0.10"
3737
http = "0.2"
3838
lazy_static = "1.4"
39+
futures-core = "0.3"
3940

4041
[dev-dependencies]
4142
base64 = "0.13"

opentelemetry-datadog/src/exporter/mod.rs

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::borrow::Cow;
99
use std::fmt::{Debug, Formatter};
1010

1111
use crate::exporter::model::FieldMapping;
12-
use async_trait::async_trait;
12+
use futures_core::future::BoxFuture;
1313
use http::{Method, Request, Uri};
1414
use itertools::Itertools;
1515
use opentelemetry::sdk::export::trace;
@@ -34,7 +34,7 @@ const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";
3434

3535
/// Datadog span exporter
3636
pub struct DatadogExporter {
37-
client: Box<dyn HttpClient>,
37+
client: Arc<dyn HttpClient>,
3838
request_url: Uri,
3939
model_config: ModelConfig,
4040
version: ApiVersion,
@@ -49,7 +49,7 @@ impl DatadogExporter {
4949
model_config: ModelConfig,
5050
request_url: Uri,
5151
version: ApiVersion,
52-
client: Box<dyn HttpClient>,
52+
client: Arc<dyn HttpClient>,
5353
resource_mapping: Option<FieldMapping>,
5454
name_mapping: Option<FieldMapping>,
5555
service_name_mapping: Option<FieldMapping>,
@@ -64,6 +64,27 @@ impl DatadogExporter {
6464
service_name_mapping,
6565
}
6666
}
67+
68+
fn build_request(&self, batch: Vec<SpanData>) -> Result<http::Request<Vec<u8>>, TraceError> {
69+
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
70+
let trace_count = traces.len();
71+
let data = self.version.encode(
72+
&self.model_config,
73+
traces,
74+
self.service_name_mapping.clone(),
75+
self.name_mapping.clone(),
76+
self.resource_mapping.clone(),
77+
)?;
78+
let req = Request::builder()
79+
.method(Method::POST)
80+
.uri(self.request_url.clone())
81+
.header(http::header::CONTENT_TYPE, self.version.content_type())
82+
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
83+
.body(data)
84+
.map_err::<Error, _>(Into::into)?;
85+
86+
Ok(req)
87+
}
6788
}
6889

6990
impl Debug for DatadogExporter {
@@ -94,8 +115,7 @@ pub struct DatadogPipelineBuilder {
94115
agent_endpoint: String,
95116
trace_config: Option<sdk::trace::Config>,
96117
version: ApiVersion,
97-
client: Option<Box<dyn HttpClient>>,
98-
118+
client: Option<Arc<dyn HttpClient>>,
99119
resource_mapping: Option<FieldMapping>,
100120
name_mapping: Option<FieldMapping>,
101121
service_name_mapping: Option<FieldMapping>,
@@ -122,15 +142,15 @@ impl Default for DatadogPipelineBuilder {
122142
not(feature = "reqwest-blocking-client"),
123143
feature = "surf-client"
124144
))]
125-
client: Some(Box::new(surf::Client::new())),
145+
client: Some(Arc::new(surf::Client::new())),
126146
#[cfg(all(
127147
not(feature = "surf-client"),
128148
not(feature = "reqwest-blocking-client"),
129149
feature = "reqwest-client"
130150
))]
131-
client: Some(Box::new(reqwest::Client::new())),
151+
client: Some(Arc::new(reqwest::Client::new())),
132152
#[cfg(feature = "reqwest-blocking-client")]
133-
client: Some(Box::new(reqwest::blocking::Client::new())),
153+
client: Some(Arc::new(reqwest::blocking::Client::new())),
134154
}
135155
}
136156
}
@@ -296,7 +316,7 @@ impl DatadogPipelineBuilder {
296316
/// Choose the http client used by uploader
297317
pub fn with_http_client<T: HttpClient + 'static>(
298318
mut self,
299-
client: Box<dyn HttpClient>,
319+
client: Arc<dyn HttpClient>,
300320
) -> Self {
301321
self.client = Some(client);
302322
self
@@ -354,28 +374,24 @@ fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
354374
.collect()
355375
}
356376

357-
#[async_trait]
377+
async fn send_request(
378+
client: Arc<dyn HttpClient>,
379+
request: http::Request<Vec<u8>>,
380+
) -> trace::ExportResult {
381+
let _ = client.send(request).await?.error_for_status()?;
382+
Ok(())
383+
}
384+
358385
impl trace::SpanExporter for DatadogExporter {
359386
/// Export spans to datadog-agent
360-
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
361-
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
362-
let trace_count = traces.len();
363-
let data = self.version.encode(
364-
&self.model_config,
365-
traces,
366-
self.service_name_mapping.clone(),
367-
self.name_mapping.clone(),
368-
self.resource_mapping.clone(),
369-
)?;
370-
let req = Request::builder()
371-
.method(Method::POST)
372-
.uri(self.request_url.clone())
373-
.header(http::header::CONTENT_TYPE, self.version.content_type())
374-
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
375-
.body(data)
376-
.map_err::<Error, _>(Into::into)?;
377-
let _ = self.client.send(req).await?.error_for_status()?;
378-
Ok(())
387+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, trace::ExportResult> {
388+
let request = match self.build_request(batch) {
389+
Ok(req) => req,
390+
Err(err) => return Box::pin(std::future::ready(Err(err))),
391+
};
392+
393+
let client = self.client.clone();
394+
Box::pin(send_request(client, request))
379395
}
380396
}
381397

opentelemetry-jaeger/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ rustdoc-args = ["--cfg", "docsrs"]
2222
async-std = { version = "1.6", optional = true }
2323
async-trait = "0.1"
2424
base64 = { version = "0.13", optional = true }
25+
futures = "0.3"
2526
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
27+
futures-executor = "0.3"
2628
headers = { version = "0.3.2", optional = true }
2729
http = { version = "0.2", optional = true }
2830
isahc = { version = "1.4", default-features = false, optional = true }
@@ -45,6 +47,7 @@ prost = { version = "0.9.0", optional = true }
4547
prost-types = { version = "0.9.0", optional = true }
4648

4749
[dev-dependencies]
50+
tokio = { version = "1.0", features = ["net", "sync"] }
4851
bytes = "1"
4952
futures-executor = "0.3"
5053
opentelemetry = { default-features = false, features = ["trace", "testing"], path = "../opentelemetry" }
@@ -63,6 +66,18 @@ features = [
6366
optional = true
6467

6568
[features]
69+
full = [
70+
"collector_client",
71+
"isahc_collector_client",
72+
"reqwest_collector_client",
73+
"reqwest_blocking_collector_client",
74+
"surf_collector_client",
75+
"wasm_collector_client",
76+
"rt-tokio",
77+
"rt-tokio-current-thread",
78+
"rt-async-std",
79+
"integration_test"
80+
]
6681
default = []
6782
collector_client = ["http", "opentelemetry-http"]
6883
isahc_collector_client = ["isahc", "opentelemetry-http/isahc"]

opentelemetry-jaeger/src/exporter/config/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ mod tests {
104104
assert_eq!(process.tags.len(), 2);
105105
}
106106

107-
#[test]
108-
fn test_read_from_env() {
107+
#[tokio::test]
108+
async fn test_read_from_env() {
109109
// OTEL_SERVICE_NAME env var also works
110110
env::set_var("OTEL_SERVICE_NAME", "test service");
111111
let builder = new_agent_pipeline();

opentelemetry-jaeger/src/exporter/mod.rs

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use std::convert::TryFrom;
1818

1919
use self::runtime::JaegerTraceRuntime;
2020
use self::thrift::jaeger;
21-
use async_trait::async_trait;
21+
use futures::channel::{mpsc, oneshot};
22+
use futures::future::BoxFuture;
23+
use futures::StreamExt;
2224
use std::convert::TryInto;
2325

2426
#[cfg(feature = "isahc_collector_client")]
@@ -42,13 +44,27 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
4244
/// Instrument Library version MUST be reported in Jaeger Span tags with the following key
4345
const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";
4446

47+
#[derive(Debug)]
48+
enum ExportMessage {
49+
Export {
50+
batch: Vec<trace::SpanData>,
51+
tx: oneshot::Sender<trace::ExportResult>,
52+
},
53+
Shutdown,
54+
}
55+
4556
/// Jaeger span exporter
4657
#[derive(Debug)]
4758
pub struct Exporter {
59+
tx: mpsc::Sender<ExportMessage>,
60+
61+
// In the switch to concurrent exports, the non-test code which used this
62+
// value was moved into the ExporterTask implementation. However, there's
63+
// still a test that relies on this value being here, thus the
64+
// allow(dead_code).
65+
#[allow(dead_code)]
4866
process: jaeger::Process,
49-
/// Whether or not to export instrumentation information.
50-
export_instrumentation_lib: bool,
51-
uploader: Box<dyn Uploader>,
67+
join_handle: Option<std::thread::JoinHandle<()>>,
5268
}
5369

5470
impl Exporter {
@@ -57,10 +73,61 @@ impl Exporter {
5773
export_instrumentation_lib: bool,
5874
uploader: Box<dyn Uploader>,
5975
) -> Exporter {
60-
Exporter {
61-
process,
76+
let (tx, rx) = futures::channel::mpsc::channel(64);
77+
78+
let exporter_task = ExporterTask {
79+
rx,
6280
export_instrumentation_lib,
6381
uploader,
82+
process: process.clone(),
83+
};
84+
85+
let join_handle = Some(std::thread::spawn(move || {
86+
futures_executor::block_on(exporter_task.run());
87+
}));
88+
89+
Exporter {
90+
tx,
91+
process,
92+
join_handle,
93+
}
94+
}
95+
}
96+
97+
struct ExporterTask {
98+
rx: mpsc::Receiver<ExportMessage>,
99+
process: jaeger::Process,
100+
/// Whether or not to export instrumentation information.
101+
export_instrumentation_lib: bool,
102+
uploader: Box<dyn Uploader>,
103+
}
104+
105+
impl ExporterTask {
106+
async fn run(mut self) {
107+
while let Some(message) = self.rx.next().await {
108+
match message {
109+
ExportMessage::Export { batch, tx } => {
110+
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
111+
let process = self.process.clone();
112+
113+
for span in batch.into_iter() {
114+
jaeger_spans.push(convert_otel_span_into_jaeger_span(
115+
span,
116+
self.export_instrumentation_lib,
117+
));
118+
}
119+
120+
let res = self
121+
.uploader
122+
.upload(jaeger::Batch::new(process, jaeger_spans))
123+
.await;
124+
125+
// Errors here might be completely expected if the receiver didn't
126+
// care about the result.
127+
let _ = tx.send(res);
128+
}
129+
ExportMessage::Shutdown => break,
130+
}
64131
}
65132
}
66133
}
@@ -74,23 +141,25 @@ pub struct Process {
74141
pub tags: Vec<KeyValue>,
75142
}
76143

77-
#[async_trait]
78144
impl trace::SpanExporter for Exporter {
79145
/// Export spans to Jaeger
80-
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
81-
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
82-
let process = self.process.clone();
83-
84-
for span in batch.into_iter() {
85-
jaeger_spans.push(convert_otel_span_into_jaeger_span(
86-
span,
87-
self.export_instrumentation_lib,
88-
));
146+
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
147+
let (tx, rx) = oneshot::channel();
148+
149+
if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) {
150+
return Box::pin(futures::future::ready(Err(Into::into(err))));
89151
}
90152

91-
self.uploader
92-
.upload(jaeger::Batch::new(process, jaeger_spans))
93-
.await
153+
Box::pin(async move { rx.await? })
154+
}
155+
156+
fn shutdown(&mut self) {
157+
let _ = self.tx.try_send(ExportMessage::Shutdown);
158+
159+
// This has the potential to block indefinitely, but as long as all of
160+
// the tasks processed by ExportTask have a timeout, this should join
161+
// eventually.
162+
self.join_handle.take().map(|handle| handle.join());
94163
}
95164
}
96165

opentelemetry-jaeger/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
//! use opentelemetry::trace::Tracer;
2626
//! use opentelemetry::global;
2727
//!
28-
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
28+
//! #[tokio::main]
29+
//! async fn main() -> Result<(), opentelemetry::trace::TraceError> {
2930
//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
3031
//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;
3132
//!

opentelemetry-otlp/src/exporter/http.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{ExportConfig, Protocol};
22
use opentelemetry_http::HttpClient;
33
use std::collections::HashMap;
4+
use std::sync::Arc;
45

56
/// Configuration of the http transport
67
#[cfg(feature = "http-proto")]
@@ -15,7 +16,7 @@ use std::collections::HashMap;
1516
)]
1617
pub struct HttpConfig {
1718
/// Select the HTTP client
18-
pub client: Option<Box<dyn HttpClient>>,
19+
pub client: Option<Arc<dyn HttpClient>>,
1920

2021
/// Additional headers to send to the collector.
2122
pub headers: Option<HashMap<String, String>>,
@@ -30,19 +31,19 @@ impl Default for HttpConfig {
3031
fn default() -> Self {
3132
HttpConfig {
3233
#[cfg(feature = "reqwest-blocking-client")]
33-
client: Some(Box::new(reqwest::blocking::Client::new())),
34+
client: Some(Arc::new(reqwest::blocking::Client::new())),
3435
#[cfg(all(
3536
not(feature = "reqwest-blocking-client"),
3637
not(feature = "surf-client"),
3738
feature = "reqwest-client"
3839
))]
39-
client: Some(Box::new(reqwest::Client::new())),
40+
client: Some(Arc::new(reqwest::Client::new())),
4041
#[cfg(all(
4142
not(feature = "reqwest-client"),
4243
not(feature = "reqwest-blocking-client"),
4344
feature = "surf-client"
4445
))]
45-
client: Some(Box::new(surf::Client::new())),
46+
client: Some(Arc::new(surf::Client::new())),
4647
#[cfg(all(
4748
not(feature = "reqwest-client"),
4849
not(feature = "surf-client"),
@@ -78,7 +79,7 @@ impl Default for HttpExporterBuilder {
7879
impl HttpExporterBuilder {
7980
/// Assign client implementation
8081
pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
81-
self.http_config.client = Some(Box::new(client));
82+
self.http_config.client = Some(Arc::new(client));
8283
self
8384
}
8485

0 commit comments

Comments
 (0)