Skip to content

Commit bcc2295

Browse files
emmaling27Convex, Inc.
authored and
Convex, Inc.
committed
Fix tracing in searchlight and try_join_buffer_unordered (#26914)
This PR fixes tracing when we use `try_join_buffer_unordered` by attaching the current span to the futures. GitOrigin-RevId: e5e1b6750af10700d27580731b6df27dd629deaa
1 parent dc316ab commit bcc2295

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

crates/common/src/minitrace_helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use regex::Regex;
1313

1414
use crate::knobs::REQUEST_TRACE_SAMPLE_CONFIG;
1515

16-
#[derive(Clone)]
16+
#[derive(Clone, Debug)]
1717
pub struct EncodedSpan(pub Option<String>);
1818

1919
impl EncodedSpan {

crates/common/src/runtime/mod.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ use governor::{
3838
},
3939
Quota,
4040
};
41+
use minitrace::{
42+
collector::SpanContext,
43+
full_name,
44+
future::FutureExt as MinitraceFutureExt,
45+
Span,
46+
};
4147
#[cfg(any(test, feature = "testing"))]
4248
use proptest::prelude::*;
4349
#[cfg(not(any(test, feature = "testing")))]
@@ -113,9 +119,14 @@ pub async fn try_join_buffered<
113119
+ 'static,
114120
) -> anyhow::Result<C> {
115121
assert_send(
116-
stream::iter(tasks.map(|task| assert_send(try_join(rt, name, assert_send(task)))))
117-
.buffered(JOIN_BUFFER_SIZE)
118-
.try_collect(),
122+
stream::iter(tasks.map(|task| {
123+
let span = SpanContext::current_local_parent()
124+
.map(|ctx| Span::root(format!("{}::{name}", full_name!()), ctx))
125+
.unwrap_or(Span::noop());
126+
assert_send(try_join(rt, name, assert_send(task), span))
127+
}))
128+
.buffered(JOIN_BUFFER_SIZE)
129+
.try_collect(),
119130
)
120131
.await
121132
}
@@ -141,9 +152,14 @@ pub async fn try_join_buffer_unordered<
141152
+ 'static,
142153
) -> anyhow::Result<C> {
143154
assert_send(
144-
stream::iter(tasks.map(|task| try_join(rt, name, task)))
145-
.buffer_unordered(JOIN_BUFFER_SIZE)
146-
.try_collect(),
155+
stream::iter(tasks.map(|task| {
156+
let span = SpanContext::current_local_parent()
157+
.map(|ctx| Span::root(format!("{}::{name}", full_name!()), ctx))
158+
.unwrap_or(Span::noop());
159+
try_join(rt, name, task, span)
160+
}))
161+
.buffer_unordered(JOIN_BUFFER_SIZE)
162+
.try_collect(),
147163
)
148164
.await
149165
}
@@ -152,12 +168,17 @@ pub async fn try_join<RT: Runtime, T: Send + 'static>(
152168
rt: &RT,
153169
name: &'static str,
154170
fut: impl Future<Output = anyhow::Result<T>> + Send + 'static,
171+
span: Span,
155172
) -> anyhow::Result<T> {
156173
let (tx, rx) = oneshot::channel();
157-
let handle = rt.spawn(name, async {
158-
let result = fut.await;
159-
let _ = tx.send(result);
160-
});
174+
let handle = rt.spawn(
175+
name,
176+
async {
177+
let result = fut.await;
178+
let _ = tx.send(result);
179+
}
180+
.in_span(span),
181+
);
161182
handle.into_join_future().await?;
162183
rx.await?
163184
}

0 commit comments

Comments
 (0)