Skip to content

Commit f06b8a3

Browse files
author
Alexey Andreev
committed
tracing
1 parent 007a044 commit f06b8a3

File tree

3 files changed

+75
-28
lines changed

3 files changed

+75
-28
lines changed

Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ libc = "0.2"
1414
crossbeam-utils = "0.8"
1515
serde = "1.0"
1616
crossbeam-channel = "0.5"
17+
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
18+
opentelemetry-zipkin = { version = "0.15", features = ["reqwest-client"], default-features = false }
19+
opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio", "reqwest_collector_client"] }
20+
tracing-opentelemetry = "0.17.3"
21+
tracing = "0.1.35"
22+
tracing-subscriber = "0.3.11"
1723

1824
[dependencies.mlua]
1925
version = "0.6"

src/fiber_pool.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ use std::{collections::LinkedList, rc::Rc, time::Duration};
33
use crossbeam_channel::{unbounded, TryRecvError};
44
use mlua::Lua;
55
use tarantool::fiber;
6+
use tracing_opentelemetry::OpenTelemetrySpanExt;
7+
// use tracing;
68

7-
use crate::{ChannelError, Executor, ModuleConfig, Task};
9+
use crate::{ChannelError, Executor, ModuleConfig, Task, InstrumentedTask};
810

911
struct SchedulerArgs<'a> {
1012
lua: &'a Lua,
@@ -26,7 +28,7 @@ fn scheduler_f(args: Box<SchedulerArgs>) -> i32 {
2628
} = *args;
2729

2830
let cond = Rc::new(fiber::Cond::new());
29-
let (tx, rx) = unbounded::<Task>();
31+
let (tx, rx) = unbounded::<InstrumentedTask>();
3032

3133
let mut workers = LinkedList::new();
3234
for _ in 0..fibers {
@@ -71,7 +73,7 @@ fn scheduler_f(args: Box<SchedulerArgs>) -> i32 {
7173
struct WorkerArgs<'a> {
7274
cond: Rc<fiber::Cond>,
7375
lua: &'a Lua,
74-
rx: crossbeam_channel::Receiver<Task>,
76+
rx: crossbeam_channel::Receiver<InstrumentedTask>,
7577
fiber_standby_timeout: f64,
7678
}
7779
fn worker_f(args: Box<WorkerArgs>) -> i32 {
@@ -87,7 +89,13 @@ fn worker_f(args: Box<WorkerArgs>) -> i32 {
8789
.create_function(move |lua, _: ()| {
8890
loop {
8991
match rx.try_recv() {
90-
Ok(task) => match task(lua) {
92+
Ok((func, span_ctx)) => match {
93+
let span = tracing::span!(tracing::Level::TRACE, "fiber pool: exec");
94+
span.set_parent(span_ctx);
95+
let _ = span.enter();
96+
97+
func(lua, span.context())
98+
} {
9199
Ok(()) => (),
92100
Err(ChannelError::TXChannelClosed) => continue,
93101
Err(err) => break Err(mlua::Error::external(err)),

src/txapi.rs

+57-24
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
1-
use tokio::sync::oneshot;
1+
use crate::eventfd;
22
use async_channel;
33
use async_channel::TryRecvError;
4-
use thiserror::Error;
5-
use crate::eventfd;
6-
use std::os::unix::io::{AsRawFd, RawFd};
7-
use std::io;
84
use mlua::Lua;
5+
use tracing_opentelemetry::OpenTelemetrySpanExt;
6+
use std::io;
7+
use std::os::unix::io::{AsRawFd, RawFd};
8+
use thiserror::Error;
9+
use tokio::sync::oneshot;
10+
use tracing::Instrument;
11+
use opentelemetry::Context;
912

10-
pub type Task = Box<dyn FnOnce(&Lua) -> Result<(), ChannelError> + Send>;
11-
type TaskSender = async_channel::Sender<Task>;
12-
type TaskReceiver = async_channel::Receiver<Task>;
13+
pub type Task = Box<dyn FnOnce(&Lua, Context) -> Result<(), ChannelError> + Send>;
14+
pub type InstrumentedTask = (Task, Context);
15+
type TaskSender = async_channel::Sender<InstrumentedTask>;
16+
type TaskReceiver = async_channel::Receiver<InstrumentedTask>;
1317

1418
#[derive(Error, Debug)]
1519
pub enum ChannelError {
@@ -40,24 +44,33 @@ impl Dispatcher {
4044
})
4145
}
4246

47+
#[tracing::instrument(level = "trace", skip_all)]
4348
pub async fn call<Func, Ret>(&self, func: Func) -> Result<Ret, ChannelError>
44-
where
45-
Ret: Send + 'static,
46-
Func: FnOnce(&Lua) -> Ret,
47-
Func: Send + 'static,
49+
where
50+
Ret: Send + 'static,
51+
Func: FnOnce(&Lua, Context) -> Ret,
52+
Func: Send + 'static,
4853
{
54+
tracing::event!(tracing::Level::TRACE, "bass drop begin");
55+
4956
let (result_tx, result_rx) = oneshot::channel();
50-
let handler_func: Task = Box::new(move |lua| {
57+
let result_rx = result_rx
58+
.instrument(tracing::span!(tracing::Level::TRACE, "result_rx"));
59+
let result_rx_span_ctx = result_rx.span().context();
60+
61+
let handler_func: Task = Box::new(move |lua, exec_ctx| {
5162
if result_tx.is_closed() {
52-
return Err(ChannelError::TXChannelClosed)
63+
return Err(ChannelError::TXChannelClosed);
5364
};
5465

55-
let result = func(lua);
56-
result_tx.send(result).or(Err(ChannelError::TXChannelClosed))
66+
let result = func(lua, exec_ctx);
67+
result_tx
68+
.send(result)
69+
.or(Err(ChannelError::TXChannelClosed))
5770
});
58-
71+
5972
let task_tx_len = self.task_tx.len();
60-
if let Err(_channel_closed) = self.task_tx.send(handler_func).await {
73+
if let Err(_channel_closed) = self.task_tx.send((handler_func, result_rx_span_ctx)).await {
6174
return Err(ChannelError::TXChannelClosed);
6275
}
6376

@@ -67,15 +80,18 @@ impl Dispatcher {
6780
}
6881
}
6982

70-
result_rx.await.or(Err(ChannelError::RXChannelClosed))
83+
tracing::event!(tracing::Level::TRACE, "bass drop end");
84+
85+
result_rx
86+
.await
87+
.or(Err(ChannelError::RXChannelClosed))
7188
}
7289

7390
pub fn len(&self) -> usize {
7491
self.task_tx.len()
7592
}
7693
}
7794

78-
7995
pub struct Executor {
8096
task_rx: TaskReceiver,
8197
eventfd: eventfd::EventFd,
@@ -86,10 +102,23 @@ impl Executor {
86102
Self { task_rx, eventfd }
87103
}
88104

105+
// #[tracing::instrument(level = "trace", skip_all)]
89106
pub fn exec(&self, lua: &Lua, coio_timeout: f64) -> Result<(), ChannelError> {
90107
loop {
91-
match self.task_rx.try_recv() {
92-
Ok(func) => return func(lua),
108+
// tracing::event!(tracing::Level::TRACE, "exec: iteration");
109+
110+
// let _ = tracing::trace_span!("executing task").enter();
111+
match self.task_rx.try_recv()
112+
{
113+
Ok((func, span_ctx)) => {
114+
// tracing::event!(tracing::Level::TRACE, "task: start");
115+
println!("{:?}", span_ctx);
116+
117+
let res = func(lua, span_ctx);
118+
119+
// tracing::event!(tracing::Level::TRACE, "task: finish");
120+
return res;
121+
}
93122
Err(TryRecvError::Empty) => (),
94123
Err(TryRecvError::Closed) => return Err(ChannelError::RXChannelClosed),
95124
};
@@ -98,7 +127,8 @@ impl Executor {
98127
}
99128
}
100129

101-
pub fn pop_many(&self, max_tasks: usize, coio_timeout: f64) -> Result<Vec<Task>, ChannelError> {
130+
// #[tracing::instrument(level = "trace", skip_all)]
131+
pub fn pop_many(&self, max_tasks: usize, coio_timeout: f64) -> Result<Vec<InstrumentedTask>, ChannelError> {
102132
if self.task_rx.is_empty() {
103133
let _ = self.eventfd.coio_read(coio_timeout);
104134
}
@@ -137,5 +167,8 @@ pub fn channel(buffer: usize) -> io::Result<(Dispatcher, Executor)> {
137167
let (task_tx, task_rx) = async_channel::bounded(buffer);
138168
let efd = eventfd::EventFd::new(0, false)?;
139169

140-
Ok((Dispatcher::new(task_tx, efd.try_clone()?), Executor::new(task_rx, efd)))
170+
Ok((
171+
Dispatcher::new(task_tx, efd.try_clone()?),
172+
Executor::new(task_rx, efd),
173+
))
141174
}

0 commit comments

Comments
 (0)