Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,18 @@ metrics:
| Metric | Type | Description |
|--------|------|-------------|
| `request.duration` | Histogram | Proxy request duration in seconds. Tagged with status, upstream. Sampled at 1%. |
| `requests.inflight` | Gauge | Number of requests currently being processed |
| `requests.inflight` | Gauge | Number of requests currently being processed. |
<!-- PROXY_METRICS:END -->

## Ingest Router Metrics

<!-- INGEST_ROUTER_METRICS:START -->
| Metric | Type | Description |
|--------|------|-------------|
| `request.duration` | Histogram | Request duration in seconds. Tagged with status, handler. |
| `requests.inflight` | Gauge | Number of requests currently being processed |
| `upstream.request.duration` | Histogram | Per-cell upstream request duration in seconds. Tagged with cell_id, status (the status-code if successful, 'timeout', or 'error'). |
<!-- INGEST_ROUTER_METRICS:END -->

## TODO: Add metrics for other modules

1 change: 1 addition & 0 deletions ingest-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
locator = { path = "../locator" }
metrics = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
29 changes: 28 additions & 1 deletion ingest-router/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::errors::IngestRouterError;
use crate::handler::{CellId, ExecutionMode, Handler};
use crate::http::send_to_upstream;
use crate::locale::Cells;
use crate::metrics_defs::UPSTREAM_REQUEST_DURATION;
use http::StatusCode;
use http_body_util::Full;
use hyper::body::Bytes;
Expand All @@ -13,9 +14,14 @@ use hyper_util::rt::TokioExecutor;
use shared::http::make_error_response;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use tokio::task::JoinSet;
use tokio::time::{Duration, sleep};

// Counter for 1% metric sampling.
static UPSTREAM_REQUEST_COUNT: AtomicU64 = AtomicU64::new(0);

#[derive(Clone)]
pub struct Executor {
client: Client<HttpConnector, Full<Bytes>>,
Expand Down Expand Up @@ -192,5 +198,26 @@ async fn send_to_cell(
let request = Request::from_parts(parts, Full::new(body));

// Send to upstream (using relay_url) - returns Response<Bytes>
send_to_upstream(client, &upstream.relay_url, request, timeout_secs).await
let start = Instant::now();
let result = send_to_upstream(client, &upstream.relay_url, request, timeout_secs).await;

// Record duration metric with status (1% sample)
if UPSTREAM_REQUEST_COUNT
.fetch_add(1, Ordering::Relaxed)
.is_multiple_of(100)
{
let status = match &result {
Ok(response) => response.status().as_u16().to_string(),
Err(IngestRouterError::UpstreamTimeout(_)) => "timeout".to_string(),
Err(_) => "error".to_string(),
};
metrics::histogram!(
UPSTREAM_REQUEST_DURATION.name,
"cell_id" => cell_id.to_string(),
"status" => status,
)
.record(start.elapsed().as_secs_f64());
}

result
}
76 changes: 55 additions & 21 deletions ingest-router/src/ingest_router_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::config;
use crate::errors::IngestRouterError;
use crate::executor;
use crate::metrics_defs::{REQUEST_DURATION, REQUESTS_INFLIGHT};
use crate::router;
use http_body_util::{BodyExt, Full};
use hyper::StatusCode;
Expand All @@ -9,6 +10,14 @@ use hyper::service::Service;
use hyper::{Request, Response};
use shared::http::make_error_response;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;

// Counter for 1% metric sampling.
static REQUEST_COUNT: AtomicU64 = AtomicU64::new(0);

// Gauge for number of requests currently being processed.
static INFLIGHT: AtomicU64 = AtomicU64::new(0);

pub struct IngestRouterService {
router: router::Router,
Expand All @@ -34,31 +43,56 @@ where
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn call(&self, req: Request<B>) -> Self::Future {
let maybe_handler = self.router.resolve(&req);

match maybe_handler {
Some((handler, cells)) => {
let (parts, body) = req.into_parts();
let executor = self.executor.clone();

Box::pin(async move {
let body_bytes = match body.collect().await {
Ok(c) => c.to_bytes(),
let start = Instant::now();
INFLIGHT.fetch_add(1, Ordering::Relaxed);

let resolved = self.router.resolve(&req);
let (parts, body) = req.into_parts();
let executor = self.executor.clone();

Box::pin(async move {
let (response, handler_name): (Response<Full<Bytes>>, &str) = match resolved {
Some((handler, cells)) => {
let handler_name = handler.name();
match body.collect().await {
Ok(c) => {
let request = Request::from_parts(parts, c.to_bytes());
let response = executor.execute(handler, request, cells).await;
(response.map(Full::new), handler_name)
}
Err(_) => {
return Ok(make_error_response(StatusCode::BAD_REQUEST).map(Full::new));
let response =
make_error_response(StatusCode::BAD_REQUEST).map(Full::new);
(response, handler_name)
}
};
let request = Request::from_parts(parts, body_bytes);
let response = executor.execute(handler, request, cells).await;
Ok(response.map(Full::new))
})
}
None => {
Box::pin(
async move { Ok(make_error_response(StatusCode::BAD_REQUEST).map(Full::new)) },
}
}
None => {
let response = make_error_response(StatusCode::BAD_REQUEST).map(Full::new);
(response, "none")
}
};

// Record metrics (1% sample)
if REQUEST_COUNT
.fetch_add(1, Ordering::Relaxed)
.is_multiple_of(100)
{
metrics::histogram!(
REQUEST_DURATION.name,
"status" => response.status().as_u16().to_string(),
"handler" => handler_name,
)
.record(start.elapsed().as_secs_f64());

let inflight = INFLIGHT.load(Ordering::Relaxed);
metrics::gauge!(REQUESTS_INFLIGHT.name).set(inflight as f64);
}
}

INFLIGHT.fetch_sub(1, Ordering::Relaxed);

Ok(response)
})
}
}

Expand Down
4 changes: 3 additions & 1 deletion ingest-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ pub mod handler;
pub mod http;
pub mod ingest_router_service;
pub mod locale;
pub mod metrics_defs;
pub mod router;

#[cfg(test)]
mod testutils;

use crate::errors::IngestRouterError;
use locator::client::Locator;
use shared::admin_service::AdminService;
use shared::http::run_http_service;

use shared::admin_service::AdminService;

pub async fn run(config: config::Config) -> Result<(), IngestRouterError> {
let locator = Locator::new(config.locator.to_client_config()).await?;

Expand Down
25 changes: 25 additions & 0 deletions ingest-router/src/metrics_defs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use shared::metrics_defs::{MetricDef, MetricType};

pub const REQUEST_DURATION: MetricDef = MetricDef {
name: "request.duration",
metric_type: MetricType::Histogram,
description: "Request duration in seconds. Tagged with status, handler.",
};

pub const REQUESTS_INFLIGHT: MetricDef = MetricDef {
name: "requests.inflight",
metric_type: MetricType::Gauge,
description: "Number of requests currently being processed",
};

pub const UPSTREAM_REQUEST_DURATION: MetricDef = MetricDef {
name: "upstream.request.duration",
metric_type: MetricType::Histogram,
description: "Per-cell upstream request duration in seconds. Tagged with cell_id, status (the status-code if successful, 'timeout', or 'error').",
};

pub const ALL_METRICS: &[MetricDef] = &[
REQUEST_DURATION,
REQUESTS_INFLIGHT,
UPSTREAM_REQUEST_DURATION,
];
2 changes: 1 addition & 1 deletion proxy/src/metrics_defs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub const REQUEST_DURATION: MetricDef = MetricDef {
pub const REQUESTS_INFLIGHT: MetricDef = MetricDef {
name: "requests.inflight",
metric_type: MetricType::Gauge,
description: "Number of requests currently being processed",
description: "Number of requests currently being processed.",
};

// TODO: all metrics must be added here for now, this can be done dynamically with a macro in the future.
Expand Down
25 changes: 22 additions & 3 deletions synapse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ fn cli() -> Result<(), CliError> {
"{}",
generate_metrics_table(proxy::metrics_defs::ALL_METRICS)
);
println!("\n## Ingest Router Metrics\n");
println!(
"{}",
generate_metrics_table(ingest_router::metrics_defs::ALL_METRICS)
);
Ok(())
}
CliCommand::SyncMetrics => {
Expand All @@ -110,6 +115,12 @@ fn cli() -> Result<(), CliError> {
&generate_metrics_table(proxy::metrics_defs::ALL_METRICS),
);

content = sync_section(
&content,
"INGEST_ROUTER_METRICS",
&generate_metrics_table(ingest_router::metrics_defs::ALL_METRICS),
);

std::fs::write(path, content).expect("Failed to write METRICS.md");
println!("Synced METRICS.md");
Ok(())
Expand Down Expand Up @@ -238,16 +249,24 @@ mod tests {
let metrics_md =
std::fs::read_to_string("../METRICS.md").expect("Failed to read METRICS.md");

let all_metrics = [
locator::metrics_defs::ALL_METRICS,
proxy::metrics_defs::ALL_METRICS,
ingest_router::metrics_defs::ALL_METRICS,
]
.into_iter()
.flatten();

let mut missing = Vec::new();
for m in locator::metrics_defs::ALL_METRICS {
if !metrics_md.contains(m.name) {
for m in all_metrics {
if !metrics_md.contains(m.name) || !metrics_md.contains(m.description) {
missing.push(m.name);
}
}

assert!(
missing.is_empty(),
"METRICS.md is missing these metrics: {:?}\nAdd them to METRICS.md",
"METRICS.md is missing: {:?}\nAdd them to METRICS.md",
missing
);
}
Expand Down