Skip to content

Commit 1bce83f

Browse files
chore: Move FutureExt into context (#2776)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent e48e6f4 commit 1bce83f

File tree

5 files changed

+132
-119
lines changed

5 files changed

+132
-119
lines changed

opentelemetry/src/context.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
//! Execution-scoped context propagation.
2+
//!
3+
//! The `context` module provides mechanisms for propagating values across API boundaries and between
4+
//! logically associated execution units. It enables cross-cutting concerns to access their data in-process
5+
//! using a shared context object.
6+
//!
7+
//! # Main Types
8+
//!
9+
//! - [`Context`]: An immutable, execution-scoped collection of values.
10+
//!
11+
112
use crate::otel_warn;
213
#[cfg(feature = "trace")]
314
use crate::trace::context::SynchronizedSpan;
@@ -9,6 +20,10 @@ use std::hash::{BuildHasherDefault, Hasher};
920
use std::marker::PhantomData;
1021
use std::sync::Arc;
1122

23+
mod future_ext;
24+
25+
pub use future_ext::FutureExt;
26+
1227
thread_local! {
1328
static CURRENT_CONTEXT: RefCell<ContextStack> = RefCell::new(ContextStack::default());
1429
}
@@ -78,7 +93,7 @@ thread_local! {
7893
#[derive(Clone, Default)]
7994
pub struct Context {
8095
#[cfg(feature = "trace")]
81-
pub(super) span: Option<Arc<SynchronizedSpan>>,
96+
pub(crate) span: Option<Arc<SynchronizedSpan>>,
8297
entries: Option<Arc<EntryMap>>,
8398
}
8499

@@ -314,15 +329,15 @@ impl Context {
314329
}
315330

316331
#[cfg(feature = "trace")]
317-
pub(super) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self {
332+
pub(crate) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self {
318333
Context {
319334
span: Some(Arc::new(value)),
320335
entries: Context::map_current(|cx| cx.entries.clone()),
321336
}
322337
}
323338

324339
#[cfg(feature = "trace")]
325-
pub(super) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self {
340+
pub(crate) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self {
326341
Context {
327342
span: Some(Arc::new(value)),
328343
entries: self.entries.clone(),
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use crate::Context;
2+
use futures_core::Stream;
3+
use futures_sink::Sink;
4+
use pin_project_lite::pin_project;
5+
use std::pin::Pin;
6+
use std::task::Context as TaskContext;
7+
use std::task::Poll;
8+
impl<T: Sized> FutureExt for T {}
9+
10+
impl<T: std::future::Future> std::future::Future for WithContext<T> {
11+
type Output = T::Output;
12+
13+
fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
14+
let this = self.project();
15+
let _guard = this.otel_cx.clone().attach();
16+
17+
this.inner.poll(task_cx)
18+
}
19+
}
20+
21+
impl<T: Stream> Stream for WithContext<T> {
22+
type Item = T::Item;
23+
24+
fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
25+
let this = self.project();
26+
let _guard = this.otel_cx.clone().attach();
27+
T::poll_next(this.inner, task_cx)
28+
}
29+
}
30+
31+
pin_project! {
32+
/// A future, stream, or sink that has an associated context.
33+
#[derive(Clone, Debug)]
34+
pub struct WithContext<T> {
35+
#[pin]
36+
inner: T,
37+
otel_cx: Context,
38+
}
39+
}
40+
41+
impl<I, T: Sink<I>> Sink<I> for WithContext<T>
42+
where
43+
T: Sink<I>,
44+
{
45+
type Error = T::Error;
46+
47+
fn poll_ready(
48+
self: Pin<&mut Self>,
49+
task_cx: &mut TaskContext<'_>,
50+
) -> Poll<Result<(), Self::Error>> {
51+
let this = self.project();
52+
let _guard = this.otel_cx.clone().attach();
53+
T::poll_ready(this.inner, task_cx)
54+
}
55+
56+
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
57+
let this = self.project();
58+
let _guard = this.otel_cx.clone().attach();
59+
T::start_send(this.inner, item)
60+
}
61+
62+
fn poll_flush(
63+
self: Pin<&mut Self>,
64+
task_cx: &mut TaskContext<'_>,
65+
) -> Poll<Result<(), Self::Error>> {
66+
let this = self.project();
67+
let _guard = this.otel_cx.clone().attach();
68+
T::poll_flush(this.inner, task_cx)
69+
}
70+
71+
fn poll_close(
72+
self: Pin<&mut Self>,
73+
task_cx: &mut TaskContext<'_>,
74+
) -> Poll<Result<(), Self::Error>> {
75+
let this = self.project();
76+
let _enter = this.otel_cx.clone().attach();
77+
T::poll_close(this.inner, task_cx)
78+
}
79+
}
80+
81+
/// Extension trait allowing futures, streams, and sinks to be traced with a span.
82+
pub trait FutureExt: Sized {
83+
/// Attaches the provided [`Context`] to this type, returning a `WithContext`
84+
/// wrapper.
85+
///
86+
/// When the wrapped type is a future, stream, or sink, the attached context
87+
/// will be set as current while it is being polled.
88+
///
89+
/// [`Context`]: Context
90+
fn with_context(self, otel_cx: Context) -> WithContext<Self> {
91+
WithContext {
92+
inner: self,
93+
otel_cx,
94+
}
95+
}
96+
97+
/// Attaches the current [`Context`] to this type, returning a `WithContext`
98+
/// wrapper.
99+
///
100+
/// When the wrapped type is a future, stream, or sink, the attached context
101+
/// will be set as the default while it is being polled.
102+
///
103+
/// [`Context`]: Context
104+
fn with_current_context(self) -> WithContext<Self> {
105+
let otel_cx = Context::current();
106+
self.with_context(otel_cx)
107+
}
108+
}

opentelemetry/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ pub mod global;
246246

247247
pub mod baggage;
248248

249-
mod context;
249+
pub mod context;
250250

251251
pub use context::{Context, ContextGuard};
252252

opentelemetry/src/trace/context.rs

Lines changed: 4 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,10 @@ use crate::{
44
trace::{Span, SpanContext, Status},
55
Context, ContextGuard, KeyValue,
66
};
7-
use futures_core::stream::Stream;
8-
use futures_sink::Sink;
9-
use pin_project_lite::pin_project;
10-
use std::{
11-
borrow::Cow,
12-
error::Error,
13-
pin::Pin,
14-
sync::Mutex,
15-
task::{Context as TaskContext, Poll},
16-
};
7+
use std::{borrow::Cow, error::Error, sync::Mutex};
8+
9+
// Re-export for compatability. This used to be contained here.
10+
pub use crate::context::FutureExt;
1711

1812
const NOOP_SPAN: SynchronizedSpan = SynchronizedSpan {
1913
span_context: SpanContext::NONE,
@@ -371,105 +365,3 @@ where
371365
{
372366
Context::map_current(|cx| f(cx.span()))
373367
}
374-
375-
pin_project! {
376-
/// A future, stream, or sink that has an associated context.
377-
#[derive(Clone, Debug)]
378-
pub struct WithContext<T> {
379-
#[pin]
380-
inner: T,
381-
otel_cx: Context,
382-
}
383-
}
384-
385-
impl<T: Sized> FutureExt for T {}
386-
387-
impl<T: std::future::Future> std::future::Future for WithContext<T> {
388-
type Output = T::Output;
389-
390-
fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
391-
let this = self.project();
392-
let _guard = this.otel_cx.clone().attach();
393-
394-
this.inner.poll(task_cx)
395-
}
396-
}
397-
398-
impl<T: Stream> Stream for WithContext<T> {
399-
type Item = T::Item;
400-
401-
fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
402-
let this = self.project();
403-
let _guard = this.otel_cx.clone().attach();
404-
T::poll_next(this.inner, task_cx)
405-
}
406-
}
407-
408-
impl<I, T: Sink<I>> Sink<I> for WithContext<T>
409-
where
410-
T: Sink<I>,
411-
{
412-
type Error = T::Error;
413-
414-
fn poll_ready(
415-
self: Pin<&mut Self>,
416-
task_cx: &mut TaskContext<'_>,
417-
) -> Poll<Result<(), Self::Error>> {
418-
let this = self.project();
419-
let _guard = this.otel_cx.clone().attach();
420-
T::poll_ready(this.inner, task_cx)
421-
}
422-
423-
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
424-
let this = self.project();
425-
let _guard = this.otel_cx.clone().attach();
426-
T::start_send(this.inner, item)
427-
}
428-
429-
fn poll_flush(
430-
self: Pin<&mut Self>,
431-
task_cx: &mut TaskContext<'_>,
432-
) -> Poll<Result<(), Self::Error>> {
433-
let this = self.project();
434-
let _guard = this.otel_cx.clone().attach();
435-
T::poll_flush(this.inner, task_cx)
436-
}
437-
438-
fn poll_close(
439-
self: Pin<&mut Self>,
440-
task_cx: &mut TaskContext<'_>,
441-
) -> Poll<Result<(), Self::Error>> {
442-
let this = self.project();
443-
let _enter = this.otel_cx.clone().attach();
444-
T::poll_close(this.inner, task_cx)
445-
}
446-
}
447-
448-
/// Extension trait allowing futures, streams, and sinks to be traced with a span.
449-
pub trait FutureExt: Sized {
450-
/// Attaches the provided [`Context`] to this type, returning a `WithContext`
451-
/// wrapper.
452-
///
453-
/// When the wrapped type is a future, stream, or sink, the attached context
454-
/// will be set as current while it is being polled.
455-
///
456-
/// [`Context`]: crate::Context
457-
fn with_context(self, otel_cx: Context) -> WithContext<Self> {
458-
WithContext {
459-
inner: self,
460-
otel_cx,
461-
}
462-
}
463-
464-
/// Attaches the current [`Context`] to this type, returning a `WithContext`
465-
/// wrapper.
466-
///
467-
/// When the wrapped type is a future, stream, or sink, the attached context
468-
/// will be set as the default while it is being polled.
469-
///
470-
/// [`Context`]: crate::Context
471-
fn with_current_context(self) -> WithContext<Self> {
472-
let otel_cx = Context::current();
473-
self.with_context(otel_cx)
474-
}
475-
}

opentelemetry/src/trace/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,7 @@ mod tracer;
177177
mod tracer_provider;
178178

179179
pub use self::{
180-
context::{
181-
get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext,
182-
},
180+
context::{get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt},
183181
span::{Span, SpanKind, Status},
184182
span_context::{SpanContext, TraceState},
185183
tracer::{SamplingDecision, SamplingResult, SpanBuilder, Tracer},

0 commit comments

Comments
 (0)