Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ humantime = { workspace = true } # input/batch
rand = { workspace = true }
oneshot = { workspace = true }
prometheus = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
strum = { workspace = true }
Expand Down
243 changes: 236 additions & 7 deletions lib/llm/src/http/service/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Router};
use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
use reqwest::Client;
use serde::Serialize;
use std::{
sync::Arc,
time::{Duration, Instant},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use tokio::spawn;

pub use prometheus::Registry;

Expand All @@ -24,6 +27,107 @@ pub const REQUEST_TYPE_STREAM: &str = "stream";
/// Partial value for the `type` label in the request counter for unary requests
pub const REQUEST_TYPE_UNARY: &str = "unary";

/// Request data for external observability integration
#[derive(Serialize)]
pub struct HeliconeProviderRequest {
pub url: String,
pub json: serde_json::Value,
pub meta: serde_json::Value,
}

/// Response data for external observability integration
#[derive(Serialize)]
pub struct HeliconeProviderResponse {
pub json: serde_json::Value,
pub status: u16,
pub headers: serde_json::Value,
}

/// Timing information for request/response cycle
#[derive(Serialize)]
pub struct HeliconeTiming {
#[serde(rename = "startTime")]
pub start_time: String,
#[serde(rename = "endTime")]
pub end_time: String,
#[serde(rename = "timeToFirstToken")]
pub time_to_first_token: Option<u64>,
}

/// Complete trace data for external observability integration
#[derive(Serialize)]
pub struct HeliconeTrace {
#[serde(rename = "providerRequest")]
pub provider_request: HeliconeProviderRequest,
#[serde(rename = "providerResponse", skip_serializing_if = "Option::is_none")]
pub provider_response: Option<HeliconeProviderResponse>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timing: Option<HeliconeTiming>,
pub provider: String,
}

/// Logger for external observability integration
#[derive(Clone)]
pub struct HeliconeLogger {
client: Client,
endpoint: String,
api_key: Option<String>,
enabled: bool,
}

impl HeliconeLogger {
pub fn new(endpoint: Option<String>) -> Self {
let endpoint = endpoint.unwrap_or_default();
let api_key = std::env::var("HELICONE_API_KEY").ok();
let enabled = !endpoint.is_empty();

if enabled {
tracing::debug!("Helicone logger initialized: endpoint={}, has_api_key={}", endpoint, api_key.is_some());
}

Self {
client: Client::new(),
endpoint,
api_key,
enabled,
}
}

pub fn log_async(&self, trace: HeliconeTrace) {
if !self.enabled {
return;
}

let client = self.client.clone();
let endpoint = self.endpoint.clone();
let api_key = self.api_key.clone();

spawn(async move {
let mut request = client
.post(&endpoint)
.header("Content-Type", "application/json");

// Add Authorization header if API key is available
if let Some(key) = api_key {
request = request.header("Authorization", format!("Bearer {}", key));
}

match request.json(&trace).send().await {
Ok(response) => {
if response.status().is_success() {
tracing::debug!("Sent Helicone trace successfully");
} else {
tracing::warn!("Helicone trace request failed with status: {}", response.status());
}
}
Err(e) => {
tracing::error!("Failed to send Helicone trace: {}", e);
}
}
});
}
}

pub struct Metrics {
request_counter: IntCounterVec,
inflight_gauge: IntGaugeVec,
Expand All @@ -32,6 +136,7 @@ pub struct Metrics {
output_sequence_length: HistogramVec,
time_to_first_token: HistogramVec,
inter_token_latency: HistogramVec,
helicone_logger: HeliconeLogger,
}

/// RAII object for inflight gauge and request counters
Expand Down Expand Up @@ -82,24 +187,35 @@ pub enum Status {
pub struct ResponseMetricCollector {
metrics: Arc<Metrics>,
model: String,
request_id: String,
start_time: Instant,
start_timestamp: u64,
// we use is_first_token to distinguish TTFT from ITL. It is true by default and
// flipped to false when the first token is returned and TTFT is published.
is_first_token: bool,
// we track the last response time so that ITL for the newly returned tokens can
// be computed.
last_response_time: Option<Duration>,
osl: usize,
// Store the original request data for Helicone logging
original_request: Option<serde_json::Value>,
endpoint_url: String,
time_to_first_token: Option<u64>,
// Accumulate response chunks to build complete response
response_chunks: Vec<String>,
// Track token counts for Helicone logging
input_tokens: usize,
total_output_tokens: usize,
}

impl Default for Metrics {
fn default() -> Self {
Self::new("nv_llm")
Self::new("nv_llm", None)
}
}

impl Metrics {
/// Create Metrics with the given prefix
/// Create Metrics with the given prefix and optional Helicone endpoint
/// The following metrics will be created:
/// - `{prefix}_http_service_requests_total` - IntCounterVec for the total number of requests processed
/// - `{prefix}_http_service_inflight_requests` - IntGaugeVec for the number of inflight requests
Expand All @@ -108,7 +224,7 @@ impl Metrics {
/// - `{prefix}_http_service_output_sequence_tokens` - HistogramVec for output sequence length in tokens
/// - `{prefix}_http_service_time_to_first_token_seconds` - HistogramVec for time to first token in seconds
/// - `{prefix}_http_service_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds
pub fn new(prefix: &str) -> Self {
pub fn new(prefix: &str, helicone_endpoint: Option<String>) -> Self {
let request_counter = IntCounterVec::new(
Opts::new(
format!("{}_http_service_requests_total", prefix),
Expand Down Expand Up @@ -197,6 +313,7 @@ impl Metrics {
output_sequence_length,
time_to_first_token,
inter_token_latency,
helicone_logger: HeliconeLogger::new(helicone_endpoint),
}
}

Expand Down Expand Up @@ -294,8 +411,20 @@ impl Metrics {
}

/// Create a new [`ResponseMetricCollector`] for collecting per-response metrics (i.e., TTFT, ITL)
pub fn create_response_collector(self: Arc<Self>, model: &str) -> ResponseMetricCollector {
ResponseMetricCollector::new(self, model.to_string().to_lowercase())
pub fn create_response_collector(
self: Arc<Self>,
model: &str,
request_id: &str,
original_request: Option<serde_json::Value>,
endpoint_url: String,
) -> ResponseMetricCollector {
ResponseMetricCollector::new(
self,
model.to_string().to_lowercase(),
request_id.to_string(),
original_request,
endpoint_url,
)
}
}

Expand Down Expand Up @@ -392,14 +521,49 @@ impl Status {
}

impl ResponseMetricCollector {
fn new(metrics: Arc<Metrics>, model: String) -> Self {
fn new(
metrics: Arc<Metrics>,
model: String,
request_id: String,
original_request: Option<serde_json::Value>,
endpoint_url: String,
) -> Self {
let start_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;

// Send initial request trace to observability backend if configured
if let Some(ref request_data) = original_request {
let trace = HeliconeTrace {
provider_request: HeliconeProviderRequest {
url: endpoint_url.clone(),
json: request_data.clone(),
meta: serde_json::json!({
"helicone-request-id": request_id,
"helicone-user-id": "dynamo-user"
}),
},
provider_response: None,
timing: None,
provider: "OPENAI".to_string(),
};

metrics.helicone_logger.log_async(trace);
}

ResponseMetricCollector {
metrics,
model,
request_id,
start_timestamp,
is_first_token: true,
last_response_time: None,
start_time: Instant::now(),
osl: 0,
original_request,
endpoint_url,
time_to_first_token: None,
response_chunks: Vec::new(),
input_tokens: 0,
total_output_tokens: 0,
}
}

Expand All @@ -414,11 +578,18 @@ impl ResponseMetricCollector {
return;
}

// Update token counts for observability
self.input_tokens = isl;
self.total_output_tokens += num_tokens;

if self.is_first_token {
// NOTE: when there are multiple tokens in the first response,
// we use the full response time as TTFT and ignore the ITL
self.is_first_token = false;

// Capture TTFT on first token
self.time_to_first_token = Some(self.start_time.elapsed().as_millis() as u64);

// Publish TTFT
let ttft = self.start_time.elapsed().as_secs_f64();
self.metrics
Expand Down Expand Up @@ -449,6 +620,11 @@ impl ResponseMetricCollector {

self.last_response_time = Some(current_duration);
}

/// Capture response text chunks for building the complete response
pub fn observe_response_chunk(&mut self, chunk: &str) {
self.response_chunks.push(chunk.to_string());
}
}

impl Drop for ResponseMetricCollector {
Expand All @@ -458,6 +634,59 @@ impl Drop for ResponseMetricCollector {
.output_sequence_length
.with_label_values(&[&self.model])
.observe(self.osl as f64);

// Send complete trace to observability backend with request and response data
if let Some(ref original_request) = self.original_request {
let end_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
let complete_response = self.response_chunks.join("");

// Build a proper OpenAI-style response
let response_json = serde_json::json!({
"id": format!("chatcmpl-{}", &self.request_id[..8]),
"object": "chat.completion",
"created": self.start_timestamp / 1000,
"model": &self.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": complete_response
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": self.input_tokens,
"completion_tokens": self.total_output_tokens,
"total_tokens": self.input_tokens + self.total_output_tokens
}
});

let trace = HeliconeTrace {
provider_request: HeliconeProviderRequest {
url: self.endpoint_url.clone(),
json: original_request.clone(),
meta: serde_json::json!({
"helicone-request-id": &self.request_id,
"helicone-user-id": "dynamo-user"
}),
},
provider_response: Some(HeliconeProviderResponse {
json: response_json,
status: 200,
headers: serde_json::json!({
"content-type": "application/json"
}),
}),
timing: Some(HeliconeTiming {
start_time: (self.start_timestamp / 1000).to_string(),
end_time: (end_timestamp / 1000).to_string(),
time_to_first_token: self.time_to_first_token,
}),
provider: "OPENAI".to_string(),
};

self.metrics.helicone_logger.log_async(trace);
}
}
}

Expand Down
Loading