Skip to content

feat: Refine runtime trait #2641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

## vNext

- **Breaking**: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641)
for the changes.
- Calls to `MeterProviderBuilder::with_resource`, `TracerProviderBuilder::with_resource`,
`LoggerProviderBuilder::with_resource` are now additive ([#2677](https://github.com/open-telemetry/opentelemetry-rust/pull/2677)).
- Moved `ExportError` trait from `opentelemetry::trace::ExportError` to `opentelemetry_sdk::export::ExportError`
- Moved `TraceError` enum from `opentelemetry::trace::TraceError` to `opentelemetry_sdk::trace::TraceError`
- Moved `TraceResult` type alias from `opentelemetry::trace::TraceResult` to `opentelemetry_sdk::trace::TraceResult`
- *Breaking*: Make `force_flush()` in `PushMetricExporter` synchronous
- **Breaking Change:** Updated the `SpanExporter` trait method signature:
- **Breaking**: Make `force_flush()` in `PushMetricExporter` synchronous
- **Breaking**: Updated the `SpanExporter` trait method signature:

```rust
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult>;
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
};

use super::{BatchConfig, LogProcessor};
use crate::runtime::{RuntimeChannel, TrySend};
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
use futures_channel::oneshot;
use futures_util::{
future::{self, Either},
Expand Down Expand Up @@ -126,13 +126,13 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
let inner_runtime = runtime.clone();

// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = inner_runtime
.interval(config.scheduled_delay)
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));

let timeout_runtime = inner_runtime.clone();
let mut logs = Vec::new();
let mut messages = Box::pin(stream::select(message_receiver, ticker));
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}
}
}));
});
// Return batch processor with link to worker
BatchLogProcessor {
message_sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures_util::{
};
use opentelemetry::{otel_debug, otel_error};

use crate::runtime::Runtime;
use crate::runtime::{to_interval_stream, Runtime};
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Expand Down Expand Up @@ -109,9 +109,8 @@ where
let worker = move |reader: &PeriodicReader<E>| {
let runtime = self.runtime.clone();
let reader = reader.clone();
self.runtime.spawn(Box::pin(async move {
let ticker = runtime
.interval(self.interval)
self.runtime.spawn(async move {
let ticker = to_interval_stream(runtime.clone(), self.interval)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| Message::Export);
let messages = Box::pin(stream::select(message_receiver, ticker));
Expand All @@ -126,7 +125,7 @@ where
}
.run(messages)
.await
}));
});
};

otel_debug!(
Expand Down
101 changes: 52 additions & 49 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,57 @@
//! [Tokio]: https://crates.io/crates/tokio
//! [async-std]: https://crates.io/crates/async-std

use futures_util::{future::BoxFuture, stream::Stream};
use futures_util::stream::{unfold, Stream};
use std::{fmt::Debug, future::Future, time::Duration};
use thiserror::Error;

/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
/// OpenTelemetry to work with any current and hopefully future runtime implementation.
/// OpenTelemetry to work with any current and hopefully future runtime implementations.
///
/// [Tokio]: https://crates.io/crates/tokio
/// [async-std]: https://crates.io/crates/async-std
///
/// # Note
///
/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads.
/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes
/// can implement this trait in a way that spawns the tasks on the same thread as the calling code.
#[cfg(feature = "experimental_async_runtime")]
pub trait Runtime: Clone + Send + Sync + 'static {
/// A future stream, which returns items in a previously specified interval. The item type is
/// not important.
type Interval: Stream + Send;

/// A future, which resolves after a previously specified amount of time. The output type is
/// not important.
type Delay: Future + Send + Unpin;

/// Create a [futures_util::stream::Stream], which returns a new item every
/// [std::time::Duration].
fn interval(&self, duration: Duration) -> Self::Interval;

/// Spawn a new task or thread, which executes the given future.
///
/// # Note
///
/// This is mainly used to run batch span processing in the background. Note, that the function
/// does not return a handle. OpenTelemetry will use a different way to wait for the future to
/// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
/// finish when the caller shuts down.
///
/// At the moment, the shutdown happens by blocking the
/// current thread. This means runtime implementations need to make sure they can still execute
/// the given future even if the main thread is blocked.
fn spawn(&self, future: BoxFuture<'static, ()>);
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;

/// Return a future that resolves after the specified [Duration].
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
}

/// Return a new future, which resolves after the specified [std::time::Duration].
fn delay(&self, duration: Duration) -> Self::Delay;
/// Uses the given runtime to produce an interval stream.
#[cfg(feature = "experimental_async_runtime")]
#[allow(dead_code)]
pub(crate) fn to_interval_stream<T: Runtime>(
runtime: T,
interval: Duration,
) -> impl Stream<Item = ()> {
unfold((), move |_| {
let runtime_cloned = runtime.clone();

async move {
runtime_cloned.delay(interval).await;
Some(((), ()))
}
})
}

/// Runtime implementation, which works with Tokio's multi thread runtime.
Expand All @@ -59,21 +74,17 @@ pub struct Tokio;
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
impl Runtime for Tokio {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
// we don't have to await on the returned future to execute
let _ = tokio::spawn(future);
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}

Expand Down Expand Up @@ -104,14 +115,10 @@ pub struct TokioCurrentThread;
)))
)]
impl Runtime for TokioCurrentThread {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
// We cannot force push tracing in current thread tokio scheduler because we rely on
// BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
// shutdown function so that the runtime will not finish the blocked task and kill any
Expand All @@ -127,8 +134,8 @@ impl Runtime for TokioCurrentThread {
});
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}

Expand All @@ -147,20 +154,16 @@ pub struct AsyncStd;
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
impl Runtime for AsyncStd {
type Interval = async_std::stream::Interval;
type Delay = BoxFuture<'static, ()>;

fn interval(&self, duration: Duration) -> Self::Interval {
async_std::stream::interval(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
let _ = async_std::task::spawn(future);
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(async_std::task::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
async_std::task::sleep(duration)
}
}

Expand Down Expand Up @@ -193,7 +196,7 @@ pub enum TrySendError {
/// Send failed due to the channel being closed.
#[error("cannot send message to batch processor as the channel is closed")]
ChannelClosed,
/// Any other send error that isnt covered above.
/// Any other send error that isn't covered above.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::RuntimeChannel;
use crate::runtime::{to_interval_stream, RuntimeChannel};
use crate::trace::error::TraceError;
use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse;
use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner;
Expand Down Expand Up @@ -190,8 +190,9 @@
C: HttpClient + 'static,
{
// todo: review if we need 'static here
let interval = runtime.interval(update_timeout);
runtime.spawn(Box::pin(async move {
let interval = to_interval_stream(runtime.clone(), update_timeout);

runtime.spawn(async move {

Check warning on line 195 in opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs#L193-L195

Added lines #L193 - L195 were not covered by tests
// either update or shutdown
let mut update = Box::pin(stream::select(
shutdown.map(|_| false),
Expand All @@ -217,7 +218,7 @@
break;
}
}
}));
});

Check warning on line 221 in opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs#L221

Added line #L221 was not covered by tests
}

async fn request_new_strategy<C>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::{OTelSdkError, OTelSdkResult};
use crate::resource::Resource;
use crate::runtime::{RuntimeChannel, TrySend};
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
use crate::trace::BatchConfig;
use crate::trace::Span;
use crate::trace::SpanProcessor;
Expand Down Expand Up @@ -309,6 +309,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
let export = self.exporter.export(self.spans.split_off(0));
let timeout = self.runtime.delay(self.config.max_export_timeout);
let time_out = self.config.max_export_timeout;

pin_mut!(export);
pin_mut!(timeout);

Expand Down Expand Up @@ -353,11 +354,10 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {

let inner_runtime = runtime.clone();
// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = inner_runtime
.interval(config.scheduled_delay)
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = inner_runtime.clone();
Expand All @@ -372,7 +372,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
};

processor.run(messages).await
}));
});

// Return batch processor with link to worker
BatchSpanProcessor {
Expand Down
Loading