Skip to content

Commit 56b5903

Browse files
committed
cleanup
1 parent 515bf77 commit 56b5903

File tree

5 files changed

+31
-21
lines changed

5 files changed

+31
-21
lines changed

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
126126
let inner_runtime = runtime.clone();
127127

128128
// Spawn worker process via user-defined spawn function.
129-
runtime.spawn(Box::pin(async move {
129+
runtime.spawn(async move {
130130
// Timer will take a reference to the current runtime, so its important we do this within the
131131
// runtime.spawn()
132132
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
@@ -204,7 +204,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
204204
}
205205
}
206206
}
207-
}));
207+
});
208208
// Return batch processor with link to worker
209209
BatchLogProcessor {
210210
message_sender,

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ where
109109
let worker = move |reader: &PeriodicReader| {
110110
let runtime = self.runtime.clone();
111111
let reader = reader.clone();
112-
self.runtime.spawn(Box::pin(async move {
112+
self.runtime.spawn(async move {
113113
let ticker = to_interval_stream(runtime.clone(), self.interval)
114114
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
115115
.map(|_| Message::Export);
@@ -125,7 +125,7 @@ where
125125
}
126126
.run(messages)
127127
.await
128-
}));
128+
});
129129
};
130130

131131
otel_debug!(

opentelemetry-sdk/src/runtime.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
//! [Tokio]: https://crates.io/crates/tokio
77
//! [async-std]: https://crates.io/crates/async-std
88
9-
use futures_util::{future::BoxFuture, stream::Stream};
109
use std::{fmt::Debug, future::Future, time::Duration};
11-
use futures_util::stream::unfold;
10+
use futures_util::stream::{unfold, Stream};
1211
use thiserror::Error;
1312

1413
/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
@@ -19,7 +18,7 @@ use thiserror::Error;
1918
///
2019
/// # Note
2120
///
22-
/// OpenTelemetry expects a *multi-threaded* runtime because its types can move across threads.
21+
/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads.
2322
/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes
2423
/// can implement this trait in a way that spawns the tasks on the same thread as the calling code.
2524
#[cfg(feature = "experimental_async_runtime")]
@@ -30,13 +29,15 @@ pub trait Runtime: Clone + Send + Sync + 'static {
3029
///
3130
/// This is mainly used to run batch span processing in the background. Note, that the function
3231
/// does not return a handle. OpenTelemetry will use a different way to wait for the future to
33-
/// finish when `TracerProvider` gets shutdown. At the moment this happens by blocking the
32+
/// finish when the caller shuts down.
33+
///
34+
/// At the moment, the shutdown happens by blocking the
3435
/// current thread. This means runtime implementations need to make sure they can still execute
3536
/// the given future even if the main thread is blocked.
36-
fn spawn(&self, future: BoxFuture<'static, ()>);
37+
fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static;
3738

38-
/// Return a new future, which resolves after the specified [std::time::Duration].
39-
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + Sync + 'static;
39+
/// Return a future that resolves after the specified [Duration].
40+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
4041
}
4142

4243
/// Uses the given runtime to produce an interval stream.
@@ -67,13 +68,16 @@ pub struct Tokio;
6768
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
6869
)]
6970
impl Runtime for Tokio {
70-
fn spawn(&self, future: BoxFuture<'static, ()>) {
71+
fn spawn<F>(&self, future: F)
72+
where
73+
F: Future<Output = ()> + Send + 'static
74+
{
7175
#[allow(clippy::let_underscore_future)]
7276
// we don't have to await on the returned future to execute
7377
let _ = tokio::spawn(future);
7478
}
7579

76-
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
80+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
7781
tokio::time::sleep(duration)
7882
}
7983
}
@@ -105,7 +109,10 @@ pub struct TokioCurrentThread;
105109
)))
106110
)]
107111
impl Runtime for TokioCurrentThread {
108-
fn spawn(&self, future: BoxFuture<'static, ()>) {
112+
fn spawn<F>(&self, future: F)
113+
where
114+
F: Future<Output = ()> + Send + 'static
115+
{
109116
// We cannot force push tracing in current thread tokio scheduler because we rely on
110117
// BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
111118
// shutdown function so that the runtime will not finish the blocked task and kill any
@@ -121,7 +128,7 @@ impl Runtime for TokioCurrentThread {
121128
});
122129
}
123130

124-
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
131+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
125132
tokio::time::sleep(duration)
126133
}
127134
}
@@ -141,12 +148,15 @@ pub struct AsyncStd;
141148
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
142149
)]
143150
impl Runtime for AsyncStd {
144-
fn spawn(&self, future: BoxFuture<'static, ()>) {
151+
fn spawn<F>(&self, future: F)
152+
where
153+
F: Future<Output = ()> + Send + 'static
154+
{
145155
#[allow(clippy::let_underscore_future)]
146156
let _ = async_std::task::spawn(future);
147157
}
148158

149-
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
159+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
150160
async_std::task::sleep(duration)
151161
}
152162
}

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl JaegerRemoteSampler {
191191
// todo: review if we need 'static here
192192
let interval = to_interval_stream(runtime.clone(), update_timeout);
193193

194-
runtime.spawn(Box::pin(async move {
194+
runtime.spawn(async move {
195195
// either update or shutdown
196196
let mut update = Box::pin(stream::select(
197197
shutdown.map(|_| false),
@@ -217,7 +217,7 @@ impl JaegerRemoteSampler {
217217
break;
218218
}
219219
}
220-
}));
220+
});
221221
}
222222

223223
async fn request_new_strategy<C>(

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
351351

352352
let inner_runtime = runtime.clone();
353353
// Spawn worker process via user-defined spawn function.
354-
runtime.spawn(Box::pin(async move {
354+
runtime.spawn(async move {
355355
// Timer will take a reference to the current runtime, so its important we do this within the
356356
// runtime.spawn()
357357
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
@@ -369,7 +369,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
369369
};
370370

371371
processor.run(messages).await
372-
}));
372+
});
373373

374374
// Return batch processor with link to worker
375375
BatchSpanProcessor {

0 commit comments

Comments
 (0)