Skip to content

Commit 80c0ddb

Browse files
authored
prometheus: run metrics render in a tokio blocking thread (#576)
1 parent 8cc3959 commit 80c0ddb

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

metrics-exporter-prometheus/src/exporter/http_listener.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::net::SocketAddr;
22

33
use http_body_util::Full;
44
use hyper::{
5-
body::{self, Bytes, Incoming},
5+
body::{Bytes, Incoming},
66
header::{HeaderValue, CONTENT_TYPE},
77
server::conn::http1::Builder as HyperHttpBuilder,
88
service::service_fn,
@@ -67,10 +67,8 @@ impl HttpListeningExporter {
6767
fn process_tcp_stream(&self, stream: TcpStream) {
6868
let is_allowed = self.check_tcp_allowed(&stream);
6969
let handle = self.handle.clone();
70-
let service = service_fn(move |req: Request<body::Incoming>| {
71-
let handle = handle.clone();
72-
async move { Ok::<_, hyper::Error>(Self::handle_http_request(is_allowed, &handle, &req)) }
73-
});
70+
let service =
71+
service_fn(move |req| Self::handle_http_request(is_allowed, handle.clone(), req));
7472

7573
tokio::spawn(async move {
7674
if let Err(err) =
@@ -115,10 +113,7 @@ impl HttpListeningExporter {
115113
#[cfg(feature = "uds-listener")]
116114
fn process_uds_stream(&self, stream: UnixStream) {
117115
let handle = self.handle.clone();
118-
let service = service_fn(move |req: Request<body::Incoming>| {
119-
let handle = handle.clone();
120-
async move { Ok::<_, hyper::Error>(Self::handle_http_request(true, &handle, &req)) }
121-
});
116+
let service = service_fn(move |req| Self::handle_http_request(true, handle.clone(), req));
122117

123118
tokio::spawn(async move {
124119
if let Err(err) =
@@ -129,26 +124,26 @@ impl HttpListeningExporter {
129124
});
130125
}
131126

132-
fn handle_http_request(
127+
async fn handle_http_request(
133128
is_allowed: bool,
134-
handle: &PrometheusHandle,
135-
req: &Request<Incoming>,
136-
) -> Response<Full<Bytes>> {
129+
handle: PrometheusHandle,
130+
req: Request<Incoming>,
131+
) -> Result<Response<Full<Bytes>>, hyper::Error> {
137132
if is_allowed {
138133
let mut response = Response::new(match req.uri().path() {
139134
"/health" => "OK".into(),
140-
_ => handle.render().into(),
135+
_ => tokio::task::spawn_blocking(move || handle.render()).await.unwrap().into(),
141136
});
142137
response.headers_mut().append(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
143-
response
138+
Ok(response)
144139
} else {
145140
// This unwrap should not fail because we don't use any function that
146141
// can assign an Err to it's inner such as `Builder::header``. A unit test
147142
// will have to suffice to detect if this fails to hold true.
148-
Response::builder()
143+
Ok(Response::builder()
149144
.status(StatusCode::FORBIDDEN)
150145
.body(Full::<Bytes>::default())
151-
.unwrap()
146+
.unwrap())
152147
}
153148
}
154149
}

metrics-exporter-prometheus/src/exporter/push_gateway.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ pub(super) fn new_push_gateway(
4141
builder = builder.header("authorization", auth.clone());
4242
}
4343

44-
let output = handle.render();
44+
let handle = handle.clone();
45+
let output = tokio::task::spawn_blocking(move || handle.render()).await.unwrap();
4546
let result =
4647
builder.method(http_method.clone()).uri(endpoint.clone()).body(Full::from(output));
4748
let req = match result {

0 commit comments

Comments
 (0)