Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Prometheus exporter #834

Merged
merged 10 commits into from
Feb 13, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Add Prometheus exporter
This adds an `/export` endpoint which returns a Prometheus compatible
data stream

Closes: SYNC-4590
jrconlin committed Jan 25, 2025
commit 52dee8d97fdcb733c6ff5d9512478bcf8063494d
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion autoendpoint/Cargo.toml
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ backtrace.workspace = true
base64.workspace = true
cadence.workspace = true
config.workspace = true
chrono.workspace = true
docopt.workspace = true
fernet.workspace = true
futures.workspace = true
@@ -50,6 +51,7 @@ again = { version = "0.1.2", default-features = false, features = [
async-trait = "0.1"
autopush_common = { path = "../autopush-common" }
jsonwebtoken = "9.3.0"
prometheus-client = { version = "0.23", optional = true }
validator = "0.19"
validator_derive = "0.19"

@@ -83,4 +85,4 @@ stub = []
# Verbosely log vapid assertions (NOT ADVISED FOR WIDE PRODUCTION USE)
log_vapid = []

reliable_report = ["autopush_common/reliable_report"]
reliable_report = ["autopush_common/reliable_report", "dep:prometheus-client"]
116 changes: 100 additions & 16 deletions autoendpoint/src/routes/reliability.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,112 @@
use std::collections::HashMap;

use actix_web::{web::Data, HttpResponse};
use serde_json::json;
use prometheus_client::{
encoding::text::encode, metrics::family::Family, metrics::gauge::Gauge, registry::Registry,
};

use crate::server::AppState;

const METRIC_NAME: &str = "autopush_reliability";

/// Generate a Prometheus compatible report. Output should follow the
/// [instrumentation](https://prometheus.io/docs/practices/instrumentation/) guidelines.
///
/// In short form, the file should be a plain text output, with each metric on it's own line
/// using the following format:
/// ```text
/// # HELP metric_name Optional description of this metric
/// # TYPE metric_name {required type (gauge|count|histogram|summary)}
/// metric_name{label="label1"} value
/// metric_name{label="label2"} value
/// ```
/// An example which would return counts of messages in given states at the current
/// time would be:
/// ```text
/// # HELP autopush_reliability Counts for messages in given states
/// # TYPE metric_name gauge
/// autopush_reliability{state="recv"} 123
/// autopush_reliability{state="stor"} 123
/// # EOF
/// ```
/// Note that time is not required. A timestamp has been added to the output, but is
/// ignored by Prometheus, and is only provided to ensure that there is no intermediate
/// caching occurring.
///
pub fn gen_report(report: Option<HashMap<String, i32>>) -> String {
let mut registry = Registry::default();

if let Some(values) = report {
// A "family" is a grouping of metrics.
// we specify this as the ("label", "label value") which index to a Gauge.
let family = Family::<Vec<(&str, String)>, Gauge>::default();
// This creates the top level association of the elements in the family with the metric.
registry.register(
METRIC_NAME,
"Count of messages at given states",
family.clone(),
);
for (milestone, value) in values.into_iter() {
// Specify the static "state" label name with the given milestone, and add the
// value as the gauge value.
family
.get_or_create(&vec![("state", milestone)])
.set(value.into());
}
}

// Return the formatted string that Prometheus will eventually read.
let mut encoded = String::new();
encode(&mut encoded, &registry).unwrap();
encoded
}

pub async fn report_handler(app_state: Data<AppState>) -> HttpResponse {
let reliability = app_state.reliability.clone();

if let Err(err) = reliability.gc().await {
error!("🔍🟥 Reporting, Error {:?}", &err);
return HttpResponse::InternalServerError()
.content_type("text/plain")
.body(format!("# ERROR: {err}\n"));
};
match reliability.report().await {
Ok(Some(v)) => {
debug!("🔍 Reporting {:?}", &v);
HttpResponse::Ok()
.content_type("application/json")
.body(json!(v).to_string())
}
Ok(None) => {
debug!("🔍 Reporting, but nothing to report");
HttpResponse::Ok()
.content_type("application/json")
.body(json!({"error": "No data"}).to_string())
}
Ok(report) => HttpResponse::Ok()
.content_type("text/plain")
.body(gen_report(report)),
Err(e) => {
debug!("🔍🟥 Reporting, Error {:?}", &e);
error!("🔍🟥 Reporting, Error {:?}", &e);
// NOTE: This will NOT be read by Prometheus, but serves as a diagnostic message.
HttpResponse::InternalServerError()
.content_type("application/json")
.body(json!({"error": e.to_string()}).to_string())
.content_type("text/plain")
.body(format!("# ERROR: {e}\n"))
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;

use autopush_common::reliability::ReliabilityState;

#[test]
fn test_report() {
// create a nonce report
let mut report: HashMap<String, i32> = HashMap::new();
let acpt = ReliabilityState::Accepted.to_string();
let trns = ReliabilityState::Transmitted.to_string();
report.insert(acpt.clone(), 111);
report.insert(ReliabilityState::Stored.to_string(), 222);
report.insert(ReliabilityState::Retrieved.to_string(), 333);
report.insert(trns.clone(), 444);

let generated = gen_report(Some(report));
// We don't really care if the `Created` or `HELP` lines are included
assert!(generated.contains(&format!("# TYPE {METRIC_NAME}")));
// sample the first and last values.
assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{acpt}\"}} 111")));
assert!(generated.contains(&format!("{METRIC_NAME}{{state=\"{trns}\"}} 444")));
}
}
2 changes: 1 addition & 1 deletion autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
@@ -213,7 +213,7 @@ impl Server {
.service(web::resource("/__version__").route(web::get().to(version_route)));
#[cfg(feature = "reliable_report")]
let app = app.service(
web::resource("/__milestones__")
web::resource("/export")
.route(web::get().to(crate::routes::reliability::report_handler)),
);
app
6 changes: 6 additions & 0 deletions autopush-common/src/errors.rs
Original file line number Diff line number Diff line change
@@ -13,6 +13,9 @@ use backtrace::Backtrace;
use serde::ser::{Serialize, SerializeMap, Serializer};
use thiserror::Error;

#[cfg(feature = "reliable_report")]
use redis::RedisError;

pub type Result<T> = std::result::Result<T, ApcError>;

/// Render a 404 response
@@ -99,6 +102,9 @@ pub enum ApcErrorKind {
ParseUrlError(#[from] url::ParseError),
#[error(transparent)]
ConfigError(#[from] config::ConfigError),
#[cfg(feature = "reliable_report")]
#[error(transparent)]
RedisError(#[from] RedisError),
#[error("Broadcast Error: {0}")]
BroadcastError(String),
#[error("Payload Error: {0}")]
28 changes: 27 additions & 1 deletion autopush-common/src/reliability.rs
Original file line number Diff line number Diff line change
@@ -5,15 +5,19 @@
/// and where messages expire early. Message expiration can lead to message loss
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use redis::Commands;

use crate::db::client::DbClient;
use crate::errors::{ApcError, ApcErrorKind, Result};
use crate::util::timing::sec_since_epoch;

pub const COUNTS: &str = "state_counts";
pub const EXPIRY: &str = "expiry";

const CONNECTION_EXPIRATION: u64 = 10;

/// The various states that a message may transit on the way from reception to delivery.
// Note: "Message" in this context refers to the Subscription Update.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Deserialize)]
@@ -138,7 +142,9 @@ impl PushReliability {
.unwrap_or_else(|| "None".to_owned()),
new
);
if let Ok(mut con) = client.get_connection() {
if let Ok(mut con) =
client.get_connection_with_timeout(Duration::from_secs(CONNECTION_EXPIRATION))
{
let mut pipeline = redis::Pipeline::new();
let pipeline = pipeline.hincr(COUNTS, new.to_string(), 1);
let pipeline = if let Some(old) = old {
@@ -166,6 +172,26 @@ impl PushReliability {
Some(new)
}

pub async fn gc(&self) -> Result<()> {
if let Some(client) = &self.client {
debug!("🔍 performing pre-report garbage collection");
if let Ok(mut conn) =
client.get_connection_with_timeout(Duration::from_secs(CONNECTION_EXPIRATION))
{
let purged: Vec<String> = conn.zrange("items", -1, sec_since_epoch() as isize)?;
let mut pipeline = redis::Pipeline::new();
for key in purged {
let parts: Vec<&str> = key.splitn(2, '#').collect();
let state = parts.first();
pipeline.hincr("state_counts", state, -1);
pipeline.zrem("items", key);
}
pipeline.exec(&mut conn).unwrap();
}
}
Ok(())
}

// Return a snapshot of milestone states
// This will probably not be called directly, but useful for debugging.
pub async fn report(&self) -> Result<Option<HashMap<String, i32>>> {