Skip to content

Commit 8135270

Browse files
authored
Merge branch 'main' into main
2 parents 9c46625 + 91ae096 commit 8135270

File tree

8 files changed

+159
-70
lines changed

8 files changed

+159
-70
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
## vNext
44

5+
- **Breaking**: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641)
6+
for the changes.
57
- Calls to `MeterProviderBuilder::with_resource`, `TracerProviderBuilder::with_resource`,
68
`LoggerProviderBuilder::with_resource` are now additive ([#2677](https://github.com/open-telemetry/opentelemetry-rust/pull/2677)).
79
- Moved `ExportError` trait from `opentelemetry::trace::ExportError` to `opentelemetry_sdk::export::ExportError`
810
- Moved `TraceError` enum from `opentelemetry::trace::TraceError` to `opentelemetry_sdk::trace::TraceError`
911
- Moved `TraceResult` type alias from `opentelemetry::trace::TraceResult` to `opentelemetry_sdk::trace::TraceResult`
10-
- *Breaking*: Make `force_flush()` in `PushMetricExporter` synchronous
11-
- **Breaking Change:** Updated the `SpanExporter` trait method signature:
12+
- **Breaking**: Make `force_flush()` in `PushMetricExporter` synchronous
13+
- **Breaking**: Updated the `SpanExporter` trait method signature:
1214

1315
```rust
1416
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult>;

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::{
1717
};
1818

1919
use super::{BatchConfig, LogProcessor};
20-
use crate::runtime::{RuntimeChannel, TrySend};
20+
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
2121
use futures_channel::oneshot;
2222
use futures_util::{
2323
future::{self, Either},
@@ -126,13 +126,13 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
126126
let inner_runtime = runtime.clone();
127127

128128
// Spawn worker process via user-defined spawn function.
129-
runtime.spawn(Box::pin(async move {
129+
runtime.spawn(async move {
130130
// Timer will take a reference to the current runtime, so its important we do this within the
131131
// runtime.spawn()
132-
let ticker = inner_runtime
133-
.interval(config.scheduled_delay)
132+
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
134133
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
135134
.map(|_| BatchMessage::Flush(None));
135+
136136
let timeout_runtime = inner_runtime.clone();
137137
let mut logs = Vec::new();
138138
let mut messages = Box::pin(stream::select(message_receiver, ticker));
@@ -204,7 +204,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
204204
}
205205
}
206206
}
207-
}));
207+
});
208208
// Return batch processor with link to worker
209209
BatchLogProcessor {
210210
message_sender,

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use futures_util::{
1313
};
1414
use opentelemetry::{otel_debug, otel_error};
1515

16-
use crate::runtime::Runtime;
16+
use crate::runtime::{to_interval_stream, Runtime};
1717
use crate::{
1818
error::{OTelSdkError, OTelSdkResult},
1919
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
@@ -109,9 +109,8 @@ where
109109
let worker = move |reader: &PeriodicReader<E>| {
110110
let runtime = self.runtime.clone();
111111
let reader = reader.clone();
112-
self.runtime.spawn(Box::pin(async move {
113-
let ticker = runtime
114-
.interval(self.interval)
112+
self.runtime.spawn(async move {
113+
let ticker = to_interval_stream(runtime.clone(), self.interval)
115114
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
116115
.map(|_| Message::Export);
117116
let messages = Box::pin(stream::select(message_receiver, ticker));
@@ -126,7 +125,7 @@ where
126125
}
127126
.run(messages)
128127
.await
129-
}));
128+
});
130129
};
131130

132131
otel_debug!(

opentelemetry-sdk/src/runtime.rs

Lines changed: 52 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,42 +6,57 @@
66
//! [Tokio]: https://crates.io/crates/tokio
77
//! [async-std]: https://crates.io/crates/async-std
88
9-
use futures_util::{future::BoxFuture, stream::Stream};
9+
use futures_util::stream::{unfold, Stream};
1010
use std::{fmt::Debug, future::Future, time::Duration};
1111
use thiserror::Error;
1212

1313
/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
14-
/// OpenTelemetry to work with any current and hopefully future runtime implementation.
14+
/// OpenTelemetry to work with any current and hopefully future runtime implementations.
1515
///
1616
/// [Tokio]: https://crates.io/crates/tokio
1717
/// [async-std]: https://crates.io/crates/async-std
18+
///
19+
/// # Note
20+
///
21+
/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads.
22+
/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes
23+
/// can implement this trait in a way that spawns the tasks on the same thread as the calling code.
1824
#[cfg(feature = "experimental_async_runtime")]
1925
pub trait Runtime: Clone + Send + Sync + 'static {
20-
/// A future stream, which returns items in a previously specified interval. The item type is
21-
/// not important.
22-
type Interval: Stream + Send;
23-
24-
/// A future, which resolves after a previously specified amount of time. The output type is
25-
/// not important.
26-
type Delay: Future + Send + Unpin;
27-
28-
/// Create a [futures_util::stream::Stream], which returns a new item every
29-
/// [std::time::Duration].
30-
fn interval(&self, duration: Duration) -> Self::Interval;
31-
3226
/// Spawn a new task or thread, which executes the given future.
3327
///
3428
/// # Note
3529
///
3630
/// This is mainly used to run batch span processing in the background. Note, that the function
3731
/// does not return a handle. OpenTelemetry will use a different way to wait for the future to
38-
/// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
32+
/// finish when the caller shuts down.
33+
///
34+
/// At the moment, the shutdown happens by blocking the
3935
/// current thread. This means runtime implementations need to make sure they can still execute
4036
/// the given future even if the main thread is blocked.
41-
fn spawn(&self, future: BoxFuture<'static, ()>);
37+
fn spawn<F>(&self, future: F)
38+
where
39+
F: Future<Output = ()> + Send + 'static;
40+
41+
/// Return a future that resolves after the specified [Duration].
42+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
43+
}
4244

43-
/// Return a new future, which resolves after the specified [std::time::Duration].
44-
fn delay(&self, duration: Duration) -> Self::Delay;
45+
/// Uses the given runtime to produce an interval stream.
46+
#[cfg(feature = "experimental_async_runtime")]
47+
#[allow(dead_code)]
48+
pub(crate) fn to_interval_stream<T: Runtime>(
49+
runtime: T,
50+
interval: Duration,
51+
) -> impl Stream<Item = ()> {
52+
unfold((), move |_| {
53+
let runtime_cloned = runtime.clone();
54+
55+
async move {
56+
runtime_cloned.delay(interval).await;
57+
Some(((), ()))
58+
}
59+
})
4560
}
4661

4762
/// Runtime implementation, which works with Tokio's multi thread runtime.
@@ -59,21 +74,17 @@ pub struct Tokio;
5974
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
6075
)]
6176
impl Runtime for Tokio {
62-
type Interval = tokio_stream::wrappers::IntervalStream;
63-
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
64-
65-
fn interval(&self, duration: Duration) -> Self::Interval {
66-
crate::util::tokio_interval_stream(duration)
67-
}
68-
69-
fn spawn(&self, future: BoxFuture<'static, ()>) {
77+
fn spawn<F>(&self, future: F)
78+
where
79+
F: Future<Output = ()> + Send + 'static,
80+
{
7081
#[allow(clippy::let_underscore_future)]
7182
// we don't have to await on the returned future to execute
7283
let _ = tokio::spawn(future);
7384
}
7485

75-
fn delay(&self, duration: Duration) -> Self::Delay {
76-
Box::pin(tokio::time::sleep(duration))
86+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
87+
tokio::time::sleep(duration)
7788
}
7889
}
7990

@@ -104,14 +115,10 @@ pub struct TokioCurrentThread;
104115
)))
105116
)]
106117
impl Runtime for TokioCurrentThread {
107-
type Interval = tokio_stream::wrappers::IntervalStream;
108-
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
109-
110-
fn interval(&self, duration: Duration) -> Self::Interval {
111-
crate::util::tokio_interval_stream(duration)
112-
}
113-
114-
fn spawn(&self, future: BoxFuture<'static, ()>) {
118+
fn spawn<F>(&self, future: F)
119+
where
120+
F: Future<Output = ()> + Send + 'static,
121+
{
115122
// We cannot force push tracing in current thread tokio scheduler because we rely on
116123
// BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
117124
// shutdown function so that the runtime will not finish the blocked task and kill any
@@ -127,8 +134,8 @@ impl Runtime for TokioCurrentThread {
127134
});
128135
}
129136

130-
fn delay(&self, duration: Duration) -> Self::Delay {
131-
Box::pin(tokio::time::sleep(duration))
137+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
138+
tokio::time::sleep(duration)
132139
}
133140
}
134141

@@ -147,20 +154,16 @@ pub struct AsyncStd;
147154
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
148155
)]
149156
impl Runtime for AsyncStd {
150-
type Interval = async_std::stream::Interval;
151-
type Delay = BoxFuture<'static, ()>;
152-
153-
fn interval(&self, duration: Duration) -> Self::Interval {
154-
async_std::stream::interval(duration)
155-
}
156-
157-
fn spawn(&self, future: BoxFuture<'static, ()>) {
157+
fn spawn<F>(&self, future: F)
158+
where
159+
F: Future<Output = ()> + Send + 'static,
160+
{
158161
#[allow(clippy::let_underscore_future)]
159162
let _ = async_std::task::spawn(future);
160163
}
161164

162-
fn delay(&self, duration: Duration) -> Self::Delay {
163-
Box::pin(async_std::task::sleep(duration))
165+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
166+
async_std::task::sleep(duration)
164167
}
165168
}
166169

@@ -193,7 +196,7 @@ pub enum TrySendError {
193196
/// Send failed due to the channel being closed.
194197
#[error("cannot send message to batch processor as the channel is closed")]
195198
ChannelClosed,
196-
/// Any other send error that isnt covered above.
199+
/// Any other send error that isn't covered above.
197200
#[error(transparent)]
198201
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
199202
}

opentelemetry-sdk/src/trace/mod.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,53 @@ mod tests {
7676
Context, KeyValue,
7777
};
7878

79+
#[test]
80+
fn span_modification_via_context() {
81+
let exporter = InMemorySpanExporterBuilder::new().build();
82+
let provider = SdkTracerProvider::builder()
83+
.with_span_processor(SimpleSpanProcessor::new(exporter.clone()))
84+
.build();
85+
let tracer = provider.tracer("test_tracer");
86+
87+
#[derive(Debug, PartialEq)]
88+
struct ValueA(u64);
89+
90+
let span = tracer.start("span-name");
91+
92+
// start with Current, which should have no span
93+
let cx = Context::current();
94+
assert!(!cx.has_active_span());
95+
96+
// add span to context
97+
let cx_with_span = cx.with_span(span);
98+
assert!(cx_with_span.has_active_span());
99+
assert!(!cx.has_active_span());
100+
101+
// modify the span by using span_ref from the context
102+
// this is the only way to modify the span as span
103+
// is moved to context.
104+
let span_ref = cx_with_span.span();
105+
span_ref.set_attribute(KeyValue::new("attribute1", "value1"));
106+
107+
// create a new context, which should not affect the original
108+
let cx_with_span_and_more = cx_with_span.with_value(ValueA(1));
109+
110+
// modify the span again using the new context.
111+
// this should still be using the original span itself.
112+
let span_ref_new = cx_with_span_and_more.span();
113+
span_ref_new.set_attribute(KeyValue::new("attribute2", "value2"));
114+
115+
span_ref_new.end();
116+
117+
let exported_spans = exporter
118+
.get_finished_spans()
119+
.expect("Spans are expected to be exported.");
120+
// There should be a single span, with attributes from both modifications.
121+
assert_eq!(exported_spans.len(), 1);
122+
let span = &exported_spans[0];
123+
assert_eq!(span.attributes.len(), 2);
124+
}
125+
79126
#[test]
80127
fn tracer_in_span() {
81128
// Arrange

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::runtime::RuntimeChannel;
1+
use crate::runtime::{to_interval_stream, RuntimeChannel};
22
use crate::trace::error::TraceError;
33
use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse;
44
use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner;
@@ -190,8 +190,9 @@ impl JaegerRemoteSampler {
190190
C: HttpClient + 'static,
191191
{
192192
// todo: review if we need 'static here
193-
let interval = runtime.interval(update_timeout);
194-
runtime.spawn(Box::pin(async move {
193+
let interval = to_interval_stream(runtime.clone(), update_timeout);
194+
195+
runtime.spawn(async move {
195196
// either update or shutdown
196197
let mut update = Box::pin(stream::select(
197198
shutdown.map(|_| false),
@@ -217,7 +218,7 @@ impl JaegerRemoteSampler {
217218
break;
218219
}
219220
}
220-
}));
221+
});
221222
}
222223

223224
async fn request_new_strategy<C>(

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::error::{OTelSdkError, OTelSdkResult};
22
use crate::resource::Resource;
3-
use crate::runtime::{RuntimeChannel, TrySend};
3+
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
44
use crate::trace::BatchConfig;
55
use crate::trace::Span;
66
use crate::trace::SpanProcessor;
@@ -309,6 +309,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
309309
let export = self.exporter.export(self.spans.split_off(0));
310310
let timeout = self.runtime.delay(self.config.max_export_timeout);
311311
let time_out = self.config.max_export_timeout;
312+
312313
pin_mut!(export);
313314
pin_mut!(timeout);
314315

@@ -353,11 +354,10 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
353354

354355
let inner_runtime = runtime.clone();
355356
// Spawn worker process via user-defined spawn function.
356-
runtime.spawn(Box::pin(async move {
357+
runtime.spawn(async move {
357358
// Timer will take a reference to the current runtime, so its important we do this within the
358359
// runtime.spawn()
359-
let ticker = inner_runtime
360-
.interval(config.scheduled_delay)
360+
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
361361
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
362362
.map(|_| BatchMessage::Flush(None));
363363
let timeout_runtime = inner_runtime.clone();
@@ -372,7 +372,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
372372
};
373373

374374
processor.run(messages).await
375-
}));
375+
});
376376

377377
// Return batch processor with link to worker
378378
BatchSpanProcessor {

0 commit comments

Comments
 (0)