Skip to content

Commit ce21672

Browse files
gautamg795Convex, Inc.
authored and
Convex, Inc.
committed
add some observability to the gRPC server (#37164)
The metrics will probably be more helpful than the logs, but I've added debug logs that look like this: ``` 2025-05-15T02:55:15.582268Z DEBUG convex-grpc: method=Authenticate grpc_status=Ok resp_msgs=1 resp_bytes=9 duration_ms=2.044 2025-05-15T02:55:16.115566Z DEBUG convex-grpc: method=ExecuteHttpAction grpc_status=<unspecified> resp_msgs=0 resp_bytes=0 duration_ms=531.756 ``` GitOrigin-RevId: c393e35b49043e1732cca22156266700aace4667
1 parent 8ed01d5 commit ce21672

File tree

5 files changed

+289
-60
lines changed

5 files changed

+289
-60
lines changed

Cargo.lock

Lines changed: 31 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ tokio-util = { version = "0.7.13", features = [ "io", "rt", "io-util" ] }
166166
tonic = { package = "tonic", version = "0.13", features = [ "gzip", "tls-aws-lc", "tls-native-roots" ] }
167167
tonic-build = "0.13"
168168
tonic-health = "0.13"
169+
tonic-middleware = "0.3"
169170
tower = { version = "0.5.2", features = [ "limit", "timeout", "util" ] }
170171
tower-cookies = "0.11"
171172
tower-http = { version = "0.6", features = [ "trace", "cors", "decompression-br", "limit" ] }

crates/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ tokio-stream = { workspace = true }
8181
tokio-util = { workspace = true }
8282
tonic = { workspace = true }
8383
tonic-health = { workspace = true }
84+
tonic-middleware = { workspace = true }
8485
tower = { workspace = true }
8586
tower-cookies = { workspace = true }
8687
tower-http = { workspace = true }

crates/common/src/grpc/middleware.rs

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
use std::{
2+
pin::Pin,
3+
sync::Arc,
4+
task::{
5+
Context,
6+
Poll,
7+
},
8+
};
9+
10+
use bytes::Bytes;
11+
use http::Request;
12+
use hyper::body::{
13+
Body,
14+
Frame,
15+
SizeHint,
16+
};
17+
use metrics::{
18+
log_counter_with_labels,
19+
register_convex_counter,
20+
register_convex_histogram,
21+
StaticMetricLabel,
22+
StatusTimer,
23+
};
24+
use pin_project::pin_project;
25+
use tokio_metrics::Instrumented;
26+
use tonic::async_trait;
27+
use tonic_middleware::{
28+
Middleware,
29+
ServiceBound,
30+
};
31+
use tower::Service;
32+
33+
use crate::{
34+
grpc::KnownMethods,
35+
runtime::TaskManager,
36+
};
37+
38+
#[derive(Clone)]
39+
pub(crate) struct TokioInstrumentationService<S> {
40+
pub(crate) known_methods: Arc<KnownMethods>,
41+
pub(crate) inner: S,
42+
}
43+
44+
impl<S> TokioInstrumentationService<S> {
45+
pub(crate) fn new(known_methods: Arc<KnownMethods>, inner: S) -> Self {
46+
Self {
47+
known_methods,
48+
inner,
49+
}
50+
}
51+
}
52+
53+
impl<S, T> Service<Request<T>> for TokioInstrumentationService<S>
54+
where
55+
S: Service<Request<T>>,
56+
{
57+
type Error = S::Error;
58+
type Future = Instrumented<S::Future>;
59+
type Response = S::Response;
60+
61+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62+
self.inner.poll_ready(cx)
63+
}
64+
65+
fn call(&mut self, req: Request<T>) -> Self::Future {
66+
let name = self
67+
.known_methods
68+
.get(req.uri().path())
69+
.copied()
70+
.unwrap_or("grpc_handler");
71+
TaskManager::instrument(name, self.inner.call(req))
72+
}
73+
}
74+
75+
register_convex_counter!(
76+
GRPC_SERVER_STARTED_TOTAL,
77+
"Total number of RPCs started by the server. This minus the total number of RPCs handled \
78+
(from the histogram) will give you the number of RPCs that are in flight",
79+
&["method"]
80+
);
81+
82+
register_convex_histogram!(
83+
GRPC_HANDLE_DURATION_SECONDS,
84+
"RPC call duration",
85+
&["status", "method", "grpc_status"]
86+
);
87+
88+
#[derive(Clone)]
89+
pub struct LoggingMiddleware {
90+
known_methods: Arc<KnownMethods>,
91+
}
92+
93+
impl LoggingMiddleware {
94+
pub fn new(known_methods: Arc<KnownMethods>) -> Self {
95+
Self { known_methods }
96+
}
97+
}
98+
99+
#[async_trait]
100+
impl<S> Middleware<S> for LoggingMiddleware
101+
where
102+
S: ServiceBound,
103+
S::Future: Send,
104+
{
105+
async fn call(
106+
&self,
107+
req: http::Request<tonic::body::Body>,
108+
mut service: S,
109+
) -> Result<http::Response<tonic::body::Body>, S::Error> {
110+
let method = self
111+
.known_methods
112+
.get(req.uri().path())
113+
.copied()
114+
.unwrap_or("unknown");
115+
log_counter_with_labels(
116+
&GRPC_SERVER_STARTED_TOTAL,
117+
1,
118+
vec![StaticMetricLabel::new("method", method)],
119+
);
120+
let mut timer = StatusTimer::new(&GRPC_HANDLE_DURATION_SECONDS);
121+
timer.add_label(StaticMetricLabel::new("method", method));
122+
// We don't set this to "Unknown" because that's a real gRPC status, whereas we
123+
// may never know a gRPC status if an error occurs.
124+
timer.add_label(StaticMetricLabel::new("grpc_status", ""));
125+
let mut response_logger = ResponseLogger {
126+
method,
127+
timer: Some(timer),
128+
msg_count: 0,
129+
size: 0,
130+
grpc_status: None,
131+
};
132+
let response = service.call(req).await?;
133+
// `grpc-status` may be in the headers for unary responses or if an error occurs
134+
// immediately.
135+
if let Some(status) = response
136+
.headers()
137+
.get("grpc-status")
138+
.map(|v| format!("{:?}", tonic::Code::from_bytes(v.as_bytes())))
139+
{
140+
response_logger.grpc_status = Some(status);
141+
}
142+
let (parts, body) = response.into_parts();
143+
let logging_body = LoggingBody::new(body, response_logger);
144+
let wrapped_body = tonic::body::Body::new(logging_body);
145+
Ok(tonic::codegen::http::Response::from_parts(
146+
parts,
147+
wrapped_body,
148+
))
149+
}
150+
}
151+
152+
struct ResponseLogger {
153+
pub method: &'static str,
154+
pub msg_count: usize,
155+
pub size: usize,
156+
pub grpc_status: Option<String>,
157+
pub timer: Option<StatusTimer>,
158+
}
159+
160+
#[pin_project]
161+
struct LoggingBody<B> {
162+
#[pin]
163+
inner: B,
164+
logger: ResponseLogger,
165+
}
166+
167+
impl<B> LoggingBody<B> {
168+
pub fn new(inner: B, logger: ResponseLogger) -> Self {
169+
Self { inner, logger }
170+
}
171+
}
172+
173+
impl<B> Body for LoggingBody<B>
174+
where
175+
B: Body<Data = Bytes> + Send + 'static,
176+
B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
177+
{
178+
type Data = Bytes;
179+
type Error = B::Error;
180+
181+
/// Intercept every HTTP/2 frame
182+
fn poll_frame(
183+
self: Pin<&mut Self>,
184+
cx: &mut Context<'_>,
185+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
186+
let this = self.project();
187+
match this.inner.poll_frame(cx) {
188+
Poll::Ready(Some(Ok(frame))) => {
189+
// Data frame
190+
if let Some(buf) = frame.data_ref() {
191+
this.logger.msg_count += 1;
192+
this.logger.size += buf.len();
193+
}
194+
// Trailers frame (EOS)
195+
else if let Some(trailers) = frame.trailers_ref() {
196+
if let Some(status) = trailers
197+
.get("grpc-status")
198+
.map(|v| format!("{:?}", tonic::Code::from_bytes(v.as_bytes())))
199+
{
200+
this.logger.grpc_status = Some(status);
201+
}
202+
}
203+
// return the frame unchanged
204+
Poll::Ready(Some(Ok(frame)))
205+
},
206+
207+
other => other, // pass through errors or end-of-stream
208+
}
209+
}
210+
211+
fn is_end_stream(&self) -> bool {
212+
self.inner.is_end_stream()
213+
}
214+
215+
fn size_hint(&self) -> SizeHint {
216+
self.inner.size_hint()
217+
}
218+
}
219+
220+
impl Drop for ResponseLogger {
221+
fn drop(&mut self) {
222+
// If the status is set, this is still a "success" at the network
223+
// level even if the gRPC status is an error, so we complete the
224+
// timer with a success status.
225+
let duration;
226+
let mut timer = self.timer.take().expect("Someone else took the timer?");
227+
if let Some(grpc_status) = &self.grpc_status {
228+
timer.replace_label(
229+
StaticMetricLabel::new("grpc_status", ""),
230+
StaticMetricLabel::new("grpc_status", grpc_status.clone()),
231+
);
232+
duration = timer.finish();
233+
} else {
234+
// The timer will drop here with status "Error", as we didn't call `finish()`
235+
duration = timer.elapsed();
236+
};
237+
tracing::debug!(
238+
target: "convex-grpc",
239+
method = %self.method,
240+
grpc_status = %self.grpc_status.take().as_deref().unwrap_or("<unspecified>"),
241+
resp_msgs = self.msg_count,
242+
resp_bytes = self.size,
243+
duration_ms = %format!("{:.3}", duration.as_secs_f64() * 1000.0),
244+
);
245+
}
246+
}

0 commit comments

Comments
 (0)