Skip to content

Commit bb37bbe

Browse files
committed
cleanup
1 parent 515bf77 commit bb37bbe

File tree

5 files changed

+26
-18
lines changed

5 files changed

+26
-18
lines changed

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
126126
let inner_runtime = runtime.clone();
127127

128128
// Spawn worker process via user-defined spawn function.
129-
runtime.spawn(Box::pin(async move {
129+
runtime.spawn(async move {
130130
// Timer will take a reference to the current runtime, so its important we do this within the
131131
// runtime.spawn()
132132
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
@@ -204,7 +204,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
204204
}
205205
}
206206
}
207-
}));
207+
});
208208
// Return batch processor with link to worker
209209
BatchLogProcessor {
210210
message_sender,

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ where
109109
let worker = move |reader: &PeriodicReader| {
110110
let runtime = self.runtime.clone();
111111
let reader = reader.clone();
112-
self.runtime.spawn(Box::pin(async move {
112+
self.runtime.spawn(async move {
113113
let ticker = to_interval_stream(runtime.clone(), self.interval)
114114
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
115115
.map(|_| Message::Export);
@@ -125,7 +125,7 @@ where
125125
}
126126
.run(messages)
127127
.await
128-
}));
128+
});
129129
};
130130

131131
otel_debug!(

opentelemetry-sdk/src/runtime.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
//! [Tokio]: https://crates.io/crates/tokio
77
//! [async-std]: https://crates.io/crates/async-std
88
9-
use futures_util::{future::BoxFuture, stream::Stream};
109
use std::{fmt::Debug, future::Future, time::Duration};
11-
use futures_util::stream::unfold;
10+
use futures_util::stream::{unfold, Stream};
1211
use thiserror::Error;
1312

1413
/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
@@ -33,10 +32,10 @@ pub trait Runtime: Clone + Send + Sync + 'static {
3332
/// finish when `TracerProvider` gets shutdown. At the moment this happens by blocking the
3433
/// current thread. This means runtime implementations need to make sure they can still execute
3534
/// the given future even if the main thread is blocked.
36-
fn spawn(&self, future: BoxFuture<'static, ()>);
35+
fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static;
3736

3837
/// Return a new future, which resolves after the specified [std::time::Duration].
39-
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + Sync + 'static;
38+
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
4039
}
4140

4241
/// Uses the given runtime to produce an interval stream.
@@ -67,13 +66,16 @@ pub struct Tokio;
6766
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
6867
)]
6968
impl Runtime for Tokio {
70-
fn spawn(&self, future: BoxFuture<'static, ()>) {
69+
fn spawn<F>(&self, future: F)
70+
where
71+
F: Future<Output=()> + Send + 'static
72+
{
7173
#[allow(clippy::let_underscore_future)]
7274
// we don't have to await on the returned future to execute
7375
let _ = tokio::spawn(future);
7476
}
7577

76-
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
78+
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + 'static {
7779
tokio::time::sleep(duration)
7880
}
7981
}
@@ -105,7 +107,10 @@ pub struct TokioCurrentThread;
105107
)))
106108
)]
107109
impl Runtime for TokioCurrentThread {
108-
fn spawn(&self, future: BoxFuture<'static, ()>) {
110+
fn spawn<F>(&self, future: F)
111+
where
112+
F: Future<Output=()> + Send + 'static
113+
{
109114
// We cannot force push tracing in current thread tokio scheduler because we rely on
110115
// BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
111116
// shutdown function so that the runtime will not finish the blocked task and kill any
@@ -121,7 +126,7 @@ impl Runtime for TokioCurrentThread {
121126
});
122127
}
123128

124-
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
129+
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + 'static {
125130
tokio::time::sleep(duration)
126131
}
127132
}
@@ -141,12 +146,15 @@ pub struct AsyncStd;
141146
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
142147
)]
143148
impl Runtime for AsyncStd {
144-
fn spawn(&self, future: BoxFuture<'static, ()>) {
149+
fn spawn<F>(&self, future: F)
150+
where
151+
F: Future<Output=()> + Send + 'static
152+
{
145153
#[allow(clippy::let_underscore_future)]
146154
let _ = async_std::task::spawn(future);
147155
}
148156

149-
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + Sync + 'static {
157+
fn delay(&self, duration: Duration) -> impl Future<Output=()> + Send + 'static {
150158
async_std::task::sleep(duration)
151159
}
152160
}

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl JaegerRemoteSampler {
191191
// todo: review if we need 'static here
192192
let interval = to_interval_stream(runtime.clone(), update_timeout);
193193

194-
runtime.spawn(Box::pin(async move {
194+
runtime.spawn(async move {
195195
// either update or shutdown
196196
let mut update = Box::pin(stream::select(
197197
shutdown.map(|_| false),
@@ -217,7 +217,7 @@ impl JaegerRemoteSampler {
217217
break;
218218
}
219219
}
220-
}));
220+
});
221221
}
222222

223223
async fn request_new_strategy<C>(

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
351351

352352
let inner_runtime = runtime.clone();
353353
// Spawn worker process via user-defined spawn function.
354-
runtime.spawn(Box::pin(async move {
354+
runtime.spawn(async move {
355355
// Timer will take a reference to the current runtime, so its important we do this within the
356356
// runtime.spawn()
357357
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
@@ -369,7 +369,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
369369
};
370370

371371
processor.run(messages).await
372-
}));
372+
});
373373

374374
// Return batch processor with link to worker
375375
BatchSpanProcessor {

0 commit comments

Comments
 (0)