Skip to content

Commit 0254bc1

Browse files
committed
hackety hack: add a future timeout that does not use tokio
1 parent 6ad1d5c commit 0254bc1

File tree

5 files changed

+49
-5
lines changed

5 files changed

+49
-5
lines changed

opentelemetry-http/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"],
2424
opentelemetry = { version = "0.27", path = "../opentelemetry", features = ["trace"] }
2525
reqwest = { workspace = true, features = ["blocking"], optional = true }
2626
tokio = { workspace = true, features = ["time"], optional = true }
27+
futures-timer = "3.0.3"

opentelemetry-http/src/lib.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
use async_trait::async_trait;
22
use std::fmt::Debug;
3-
3+
use std::future::Future;
4+
use std::pin::Pin;
5+
use std::task::Poll;
6+
use std::time::Duration;
47
#[doc(no_inline)]
58
pub use bytes::Bytes;
69
#[doc(no_inline)]
710
pub use http::{Request, Response};
11+
use opentelemetry::Context;
812
use opentelemetry::propagation::{Extractor, Injector};
913

1014
/// Helper for injecting headers into HTTP Requests. This is used for OpenTelemetry context
@@ -102,7 +106,7 @@ mod reqwest {
102106

103107
#[cfg(feature = "hyper")]
104108
pub mod hyper {
105-
use crate::ResponseExt;
109+
use crate::{timeout, ResponseExt};
106110

107111
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
108112
use http::HeaderValue;
@@ -116,7 +120,7 @@ pub mod hyper {
116120
use std::pin::Pin;
117121
use std::task::{self, Poll};
118122
use std::time::Duration;
119-
use tokio::time;
123+
120124

121125
#[derive(Debug, Clone)]
122126
pub struct HyperClient<C = HttpConnector>
@@ -163,7 +167,7 @@ pub mod hyper {
163167
.headers_mut()
164168
.insert(http::header::AUTHORIZATION, authorization.clone());
165169
}
166-
let mut response = time::timeout(self.timeout, self.inner.request(request)).await??;
170+
let mut response = timeout(self.timeout, self.inner.request(request)).await??;
167171
let headers = std::mem::take(response.headers_mut());
168172

169173
let mut http_response = Response::builder()
@@ -218,6 +222,40 @@ impl<T> ResponseExt for Response<T> {
218222
}
219223
}
220224

225+
struct Timeout<F> {
226+
future: F,
227+
delay: Pin<Box<dyn Future<Output = ()> + Send>>,
228+
}
229+
230+
impl<F: Future + std::marker::Unpin> Future for Timeout<F> where
231+
F::Output: Send,
232+
{
233+
type Output = Result<F::Output, &'static str>;
234+
235+
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
236+
let this = self.get_mut();
237+
238+
// Poll the delay future
239+
if this.delay.as_mut().poll(cx).is_ready() {
240+
return Poll::Ready(Err("timeout"));
241+
}
242+
243+
// Poll the main future
244+
match Pin::new(&mut this.future).poll(cx) {
245+
Poll::Ready(output) => Poll::Ready(Ok(output)),
246+
Poll::Pending => Poll::Pending,
247+
}
248+
}
249+
}
250+
251+
fn timeout<F: Future + Send>(duration: Duration, future: F) -> Timeout<F> {
252+
Timeout {
253+
future,
254+
delay: Box::pin(futures_timer::Delay::new(duration)),
255+
}
256+
}
257+
258+
221259
#[cfg(test)]
222260
mod tests {
223261
use super::*;

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::sync::Arc;
22

33
use async_trait::async_trait;
44
use http::{header::CONTENT_TYPE, Method};
5+
use opentelemetry::otel_debug;
56
use opentelemetry_sdk::metrics::data::ResourceMetrics;
67
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
78

@@ -21,6 +22,8 @@ impl MetricsClient for OtlpHttpClient {
2122
_ => Err(MetricError::Other("exporter is already shut down".into())),
2223
})?;
2324

25+
otel_debug!(name: "MetricsClientExport");
26+
2427
let (body, content_type) = self.build_metrics_export_body(metrics)?;
2528
let mut request = http::Request::builder()
2629
.method(Method::POST)

opentelemetry-otlp/tests/integration_test/src/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ static INIT_TRACING: Once = Once::new();
4141
fn init_tracing() {
4242
INIT_TRACING.call_once(|| {
4343
let subscriber = FmtSubscriber::builder()
44-
.with_max_level(tracing::Level::DEBUG)
44+
.with_max_level(tracing::Level::TRACE)
4545
.finish();
4646

4747
tracing::subscriber::set_global_default(subscriber)

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,8 @@ impl PeriodicReaderInner {
367367
return Err(e);
368368
}
369369

370+
otel_debug!(name: "PeriodicReaderMetricsExported");
371+
370372
Ok(())
371373
}
372374

0 commit comments

Comments
 (0)