55# the root directory of this source tree.
66
77import json
8- from datetime import datetime
8+ from datetime import UTC , datetime
99from typing import Protocol
1010
1111import aiosqlite
1212
13- from llama_stack .apis .telemetry import QueryCondition , Span , SpanWithStatus , Trace
13+ from llama_stack .apis .telemetry import (
14+ MetricDataPoint ,
15+ MetricLabel ,
16+ MetricLabelMatcher ,
17+ MetricQueryType ,
18+ MetricSeries ,
19+ QueryCondition ,
20+ QueryMetricsResponse ,
21+ Span ,
22+ SpanWithStatus ,
23+ Trace ,
24+ )
1425
1526
1627class TraceStore (Protocol ):
@@ -29,11 +40,119 @@ async def get_span_tree(
2940 max_depth : int | None = None ,
3041 ) -> dict [str , SpanWithStatus ]: ...
3142
43+ async def query_metrics (
44+ self ,
45+ metric_name : str ,
46+ start_time : datetime ,
47+ end_time : datetime | None = None ,
48+ granularity : str | None = "1d" ,
49+ query_type : MetricQueryType = MetricQueryType .RANGE ,
50+ label_matchers : list [MetricLabelMatcher ] | None = None ,
51+ ) -> QueryMetricsResponse : ...
52+
3253
3354class SQLiteTraceStore (TraceStore ):
3455 def __init__ (self , conn_string : str ):
3556 self .conn_string = conn_string
3657
58+ async def query_metrics (
59+ self ,
60+ metric_name : str ,
61+ start_time : datetime ,
62+ end_time : datetime | None = None ,
63+ granularity : str | None = "1d" ,
64+ query_type : MetricQueryType = MetricQueryType .RANGE ,
65+ label_matchers : list [MetricLabelMatcher ] | None = None ,
66+ ) -> QueryMetricsResponse :
67+ """Query metrics from span events stored in SQLite.
68+
69+ Args:
70+ metric_name: The name of the metric to query (e.g., "prompt_tokens")
71+ start_time: Start time for the query range
72+ end_time: End time for the query range (defaults to now if None)
73+ granularity: Time granularity for aggregation (not implemented yet)
74+ query_type: Type of query (RANGE or INSTANT)
75+ label_matchers: Label filters to apply
76+
77+ Returns:
78+ QueryMetricsResponse with metric time series data
79+ """
80+ if end_time is None :
81+ end_time = datetime .now (UTC )
82+
83+ # Build the base query
84+ query = """
85+ SELECT
86+ se.name,
87+ se.timestamp,
88+ se.attributes
89+ FROM span_events se
90+ WHERE se.name = ?
91+ AND se.timestamp BETWEEN ? AND ?
92+ """
93+
94+ params = [f"metric.{ metric_name } " , start_time .isoformat (), end_time .isoformat ()]
95+
96+ # Add label matchers if provided
97+ if label_matchers :
98+ for matcher in label_matchers :
99+ if matcher .operator == "=" :
100+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') = ?"
101+ params .append (matcher .value )
102+ elif matcher .operator == "!=" :
103+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') != ?"
104+ params .append (matcher .value )
105+ elif matcher .operator == "=~" :
106+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') LIKE ?"
107+ params .append (f"%{ matcher .value } %" )
108+ elif matcher .operator == "!~" :
109+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') NOT LIKE ?"
110+ params .append (f"%{ matcher .value } %" )
111+
112+ query += " ORDER BY se.timestamp"
113+
114+ # Execute query
115+ async with aiosqlite .connect (self .conn_string ) as conn :
116+ conn .row_factory = aiosqlite .Row
117+ async with conn .execute (query , params ) as cursor :
118+ rows = await cursor .fetchall ()
119+
120+ if not rows :
121+ return QueryMetricsResponse (data = [])
122+
123+ # Parse metric data
124+ data_points = []
125+ labels : list [MetricLabel ] = []
126+
127+ for row in rows :
128+ # Parse JSON attributes
129+ attributes = json .loads (row ["attributes" ])
130+
131+ # Extract metric value and unit
132+ value = attributes .get ("value" )
133+ unit = attributes .get ("unit" , "" )
134+
135+ # Extract labels from attributes
136+ metric_labels = []
137+ for key , val in attributes .items ():
138+ if key not in ["value" , "unit" ]:
139+ metric_labels .append (MetricLabel (name = key , value = str (val )))
140+
141+ # Create data point
142+ timestamp = datetime .fromisoformat (row ["timestamp" ])
143+ data_points .append (
144+ MetricDataPoint (
145+ timestamp = int (timestamp .timestamp ()),
146+ value = value ,
147+ unit = unit ,
148+ )
149+ )
150+
151+ # Create metric series
152+ metric_series = [MetricSeries (metric = metric_name , labels = labels , values = data_points )]
153+
154+ return QueryMetricsResponse (data = metric_series )
155+
37156 async def query_traces (
38157 self ,
39158 attribute_filters : list [QueryCondition ] | None = None ,
0 commit comments