Skip to content

Commit 515bf77

Browse files
committed
Refine runtime trait
1 parent de197e4 commit 515bf77

File tree

5 files changed

+41
-55
lines changed

5 files changed

+41
-55
lines changed

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::{
1717
};
1818

1919
use super::{BatchConfig, LogProcessor};
20-
use crate::runtime::{RuntimeChannel, TrySend};
20+
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
2121
use futures_channel::oneshot;
2222
use futures_util::{
2323
future::{self, Either},
@@ -129,10 +129,10 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
129129
runtime.spawn(Box::pin(async move {
130130
// Timer will take a reference to the current runtime, so its important we do this within the
131131
// runtime.spawn()
132-
let ticker = inner_runtime
133-
.interval(config.scheduled_delay)
132+
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
134133
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
135134
.map(|_| BatchMessage::Flush(None));
135+
136136
let timeout_runtime = inner_runtime.clone();
137137
let mut logs = Vec::new();
138138
let mut messages = Box::pin(stream::select(message_receiver, ticker));

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use futures_util::{
1313
};
1414
use opentelemetry::{otel_debug, otel_error};
1515

16-
use crate::runtime::Runtime;
16+
use crate::runtime::{to_interval_stream, Runtime};
1717
use crate::{
1818
error::{OTelSdkError, OTelSdkResult},
1919
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
@@ -110,8 +110,7 @@ where
110110
let runtime = self.runtime.clone();
111111
let reader = reader.clone();
112112
self.runtime.spawn(Box::pin(async move {
113-
let ticker = runtime
114-
.interval(self.interval)
113+
let ticker = to_interval_stream(runtime.clone(), self.interval)
115114
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
116115
.map(|_| Message::Export);
117116
let messages = Box::pin(stream::select(message_receiver, ticker));

opentelemetry-sdk/src/runtime.rs

Lines changed: 30 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,48 @@
88
99
use futures_util::{future::BoxFuture, stream::Stream};
1010
use std::{fmt::Debug, future::Future, time::Duration};
11+
use futures_util::stream::unfold;
1112
use thiserror::Error;
1213

1314
/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
14-
/// OpenTelemetry to work with any current and hopefully future runtime implementation.
15+
/// OpenTelemetry to work with any current and hopefully future runtime implementations.
1516
///
1617
/// [Tokio]: https://crates.io/crates/tokio
1718
/// [async-std]: https://crates.io/crates/async-std
19+
///
20+
/// # Note
21+
///
22+
/// OpenTelemetry expects a *multi-threaded* runtime because its types can move across threads.
23+
/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes
24+
/// can implement this trait in a way that spawns the tasks on the same thread as the calling code.
1825
#[cfg(feature = "experimental_async_runtime")]
1926
pub trait Runtime: Clone + Send + Sync + 'static {
20-
/// A future stream, which returns items in a previously specified interval. The item type is
21-
/// not important.
22-
type Interval: Stream + Send;
23-
24-
/// A future, which resolves after a previously specified amount of time. The output type is
25-
/// not important.
26-
type Delay: Future + Send + Unpin;
27-
28-
/// Create a [futures_util::stream::Stream], which returns a new item every
29-
/// [std::time::Duration].
30-
fn interval(&self, duration: Duration) -> Self::Interval;
31-
3227
/// Spawn a new task or thread, which executes the given future.
3328
///
3429
/// # Note
3530
///
3631
/// This is mainly used to run batch span processing in the background. Note, that the function
3732
/// does not return a handle. OpenTelemetry will use a different way to wait for the future to
38-
/// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
33+
/// finish when `TracerProvider` gets shutdown. At the moment this happens by blocking the
3934
/// current thread. This means runtime implementations need to make sure they can still execute
4035
/// the given future even if the main thread is blocked.
4136
fn spawn(&self, future: BoxFuture<'static, ()>);
4237

4338
/// Return a new future, which resolves after the specified [std::time::Duration].
44-
fn delay(&self, duration: Duration) -> Self::Delay;
39+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + Sync + 'static;
40+
}
41+
42+
/// Uses the given runtime to produce an interval stream.
43+
#[cfg(feature = "experimental_async_runtime")]
44+
pub(crate) fn to_interval_stream<T: Runtime>(runtime: T, interval: Duration) -> impl Stream<Item = ()> {
45+
unfold((), move |_| {
46+
let runtime_cloned = runtime.clone();
47+
48+
async move {
49+
runtime_cloned.delay(interval).await;
50+
Some(((), ()))
51+
}
52+
})
4553
}
4654

4755
/// Runtime implementation, which works with Tokio's multi thread runtime.
@@ -59,21 +67,14 @@ pub struct Tokio;
5967
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
6068
)]
6169
impl Runtime for Tokio {
62-
type Interval = tokio_stream::wrappers::IntervalStream;
63-
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
64-
65-
fn interval(&self, duration: Duration) -> Self::Interval {
66-
crate::util::tokio_interval_stream(duration)
67-
}
68-
6970
fn spawn(&self, future: BoxFuture<'static, ()>) {
7071
#[allow(clippy::let_underscore_future)]
7172
// we don't have to await on the returned future to execute
7273
let _ = tokio::spawn(future);
7374
}
7475

75-
fn delay(&self, duration: Duration) -> Self::Delay {
76-
Box::pin(tokio::time::sleep(duration))
76+
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
77+
tokio::time::sleep(duration)
7778
}
7879
}
7980

@@ -104,13 +105,6 @@ pub struct TokioCurrentThread;
104105
)))
105106
)]
106107
impl Runtime for TokioCurrentThread {
107-
type Interval = tokio_stream::wrappers::IntervalStream;
108-
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
109-
110-
fn interval(&self, duration: Duration) -> Self::Interval {
111-
crate::util::tokio_interval_stream(duration)
112-
}
113-
114108
fn spawn(&self, future: BoxFuture<'static, ()>) {
115109
// We cannot force push tracing in current thread tokio scheduler because we rely on
116110
// BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
@@ -127,8 +121,8 @@ impl Runtime for TokioCurrentThread {
127121
});
128122
}
129123

130-
fn delay(&self, duration: Duration) -> Self::Delay {
131-
Box::pin(tokio::time::sleep(duration))
124+
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
125+
tokio::time::sleep(duration)
132126
}
133127
}
134128

@@ -147,20 +141,13 @@ pub struct AsyncStd;
147141
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
148142
)]
149143
impl Runtime for AsyncStd {
150-
type Interval = async_std::stream::Interval;
151-
type Delay = BoxFuture<'static, ()>;
152-
153-
fn interval(&self, duration: Duration) -> Self::Interval {
154-
async_std::stream::interval(duration)
155-
}
156-
157144
fn spawn(&self, future: BoxFuture<'static, ()>) {
158145
#[allow(clippy::let_underscore_future)]
159146
let _ = async_std::task::spawn(future);
160147
}
161148

162-
fn delay(&self, duration: Duration) -> Self::Delay {
163-
Box::pin(async_std::task::sleep(duration))
149+
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
150+
async_std::task::sleep(duration)
164151
}
165152
}
166153

@@ -193,7 +180,7 @@ pub enum TrySendError {
193180
/// Send failed due to the channel being closed.
194181
#[error("cannot send message to batch processor as the channel is closed")]
195182
ChannelClosed,
196-
/// Any other send error that isnt covered above.
183+
/// Any other send error that isn't covered above.
197184
#[error(transparent)]
198185
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
199186
}

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::runtime::RuntimeChannel;
1+
use crate::runtime::{to_interval_stream, RuntimeChannel};
22
use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse;
33
use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner;
44
use crate::trace::{Sampler, ShouldSample};
@@ -189,7 +189,8 @@ impl JaegerRemoteSampler {
189189
C: HttpClient + 'static,
190190
{
191191
// todo: review if we need 'static here
192-
let interval = runtime.interval(update_timeout);
192+
let interval = to_interval_stream(runtime.clone(), update_timeout);
193+
193194
runtime.spawn(Box::pin(async move {
194195
// either update or shutdown
195196
let mut update = Box::pin(stream::select(

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::error::{OTelSdkError, OTelSdkResult};
22
use crate::resource::Resource;
3-
use crate::runtime::{RuntimeChannel, TrySend};
3+
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
44
use crate::trace::BatchConfig;
55
use crate::trace::Span;
66
use crate::trace::SpanProcessor;
@@ -308,7 +308,7 @@ impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
308308
}
309309

310310
let export = self.exporter.export(self.spans.split_off(0));
311-
let timeout = self.runtime.delay(self.config.max_export_timeout);
311+
let timeout = Box::pin(self.runtime.delay(self.config.max_export_timeout));
312312
let time_out = self.config.max_export_timeout;
313313

314314
Box::pin(async move {
@@ -354,8 +354,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
354354
runtime.spawn(Box::pin(async move {
355355
// Timer will take a reference to the current runtime, so its important we do this within the
356356
// runtime.spawn()
357-
let ticker = inner_runtime
358-
.interval(config.scheduled_delay)
357+
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
359358
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
360359
.map(|_| BatchMessage::Flush(None));
361360
let timeout_runtime = inner_runtime.clone();

0 commit comments

Comments
 (0)