55# the root directory of this source tree.
66
77import json
8- from datetime import datetime
8+ from datetime import UTC , datetime
99from typing import Protocol
1010
1111import aiosqlite
1212
13- from llama_stack .apis .telemetry import QueryCondition , Span , SpanWithStatus , Trace
13+ from llama_stack .apis .telemetry import (
14+ MetricDataPoint ,
15+ MetricLabel ,
16+ MetricLabelMatcher ,
17+ MetricQueryType ,
18+ MetricSeries ,
19+ QueryCondition ,
20+ QueryMetricsResponse ,
21+ Span ,
22+ SpanWithStatus ,
23+ Trace ,
24+ )
1425
1526
1627class TraceStore (Protocol ):
@@ -29,11 +40,203 @@ async def get_span_tree(
2940 max_depth : int | None = None ,
3041 ) -> dict [str , SpanWithStatus ]: ...
3142
43+ async def query_metrics (
44+ self ,
45+ metric_name : str ,
46+ start_time : datetime ,
47+ end_time : datetime | None = None ,
48+ granularity : str | None = "1d" ,
49+ query_type : MetricQueryType = MetricQueryType .RANGE ,
50+ label_matchers : list [MetricLabelMatcher ] | None = None ,
51+ ) -> QueryMetricsResponse : ...
52+
3253
3354class SQLiteTraceStore (TraceStore ):
3455 def __init__ (self , conn_string : str ):
3556 self .conn_string = conn_string
3657
58+ async def query_metrics (
59+ self ,
60+ metric_name : str ,
61+ start_time : datetime ,
62+ end_time : datetime | None = None ,
63+ granularity : str | None = None ,
64+ query_type : MetricQueryType = MetricQueryType .RANGE ,
65+ label_matchers : list [MetricLabelMatcher ] | None = None ,
66+ ) -> QueryMetricsResponse :
67+ """Query metrics from span events stored in SQLite.
68+ Args:
69+ metric_name: The name of the metric to query (e.g., "prompt_tokens")
70+ start_time: Start time for the query range
71+ end_time: End time for the query range (defaults to now if None)
72+ granularity: Time granularity for aggregation (e.g., "1m", "5m", "1h", "1d")
73+ query_type: Type of query (RANGE or INSTANT)
74+ label_matchers: Label filters to apply
75+ Returns:
76+ QueryMetricsResponse with metric time series data
77+ """
78+ if end_time is None :
79+ end_time = datetime .now (UTC )
80+
81+ # Build the base query with aggregation
82+ if query_type == MetricQueryType .INSTANT :
83+ # For instant queries, aggregate all data into a single point
84+ query = """
85+ SELECT
86+ se.name,
87+ SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
88+ json_extract(se.attributes, '$.unit') as unit,
89+ se.attributes
90+ FROM span_events se
91+ WHERE se.name = ?
92+ AND se.timestamp BETWEEN ? AND ?
93+ """
94+ else :
95+ # For range queries, aggregate by time buckets based on granularity
96+ if granularity :
97+ time_format = self ._get_time_format_for_granularity (granularity )
98+
99+ query = f"""
100+ SELECT
101+ se.name,
102+ SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
103+ json_extract(se.attributes, '$.unit') as unit,
104+ se.attributes,
105+ strftime({ time_format } , se.timestamp) as bucket_start
106+ FROM span_events se
107+ WHERE se.name = ?
108+ AND se.timestamp BETWEEN ? AND ?
109+ """
110+ else :
111+ # For no granularity (None), return individual data points
112+ query = """
113+ SELECT
114+ se.name,
115+ json_extract(se.attributes, '$.value') as value,
116+ json_extract(se.attributes, '$.unit') as unit,
117+ se.attributes,
118+ se.timestamp
119+ FROM span_events se
120+ WHERE se.name = ?
121+ AND se.timestamp BETWEEN ? AND ?
122+ """
123+
124+ params = [f"metric.{ metric_name } " , start_time .isoformat (), end_time .isoformat ()]
125+
126+ # Add label matchers if provided
127+ if label_matchers :
128+ for matcher in label_matchers :
129+ if matcher .operator == "=" :
130+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') = ?"
131+ params .append (matcher .value )
132+ elif matcher .operator == "!=" :
133+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') != ?"
134+ params .append (matcher .value )
135+ elif matcher .operator == "=~" :
136+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') LIKE ?"
137+ params .append (f"%{ matcher .value } %" )
138+ elif matcher .operator == "!~" :
139+ query += f" AND json_extract(se.attributes, '$.{ matcher .name } ') NOT LIKE ?"
140+ params .append (f"%{ matcher .value } %" )
141+
142+ if query_type == MetricQueryType .RANGE and granularity :
143+ group_time_format = self ._get_time_format_for_granularity (granularity )
144+ query += f" GROUP BY strftime({ group_time_format } , se.timestamp), json_extract(se.attributes, '$.unit')"
145+ query += " ORDER BY bucket_start"
146+ elif query_type == MetricQueryType .INSTANT :
147+ query += " GROUP BY json_extract(se.attributes, '$.unit')"
148+ else :
149+ # For range queries without granularity (no aggregation)
150+ query += " ORDER BY se.timestamp"
151+
152+ # Execute query
153+ async with aiosqlite .connect (self .conn_string ) as conn :
154+ conn .row_factory = aiosqlite .Row
155+ async with conn .execute (query , params ) as cursor :
156+ rows = await cursor .fetchall ()
157+
158+ if not rows :
159+ return QueryMetricsResponse (data = [])
160+
161+ # Parse metric data
162+ data_points = []
163+ labels : list [MetricLabel ] = []
164+
165+ for row in rows :
166+ # Parse JSON attributes
167+ attributes = json .loads (row ["attributes" ])
168+
169+ # Extract metric value and unit
170+ value = row ["value" ]
171+ unit = row ["unit" ] or ""
172+
173+ # Extract labels from attributes
174+ metric_labels = []
175+ for key , val in attributes .items ():
176+ if key not in ["value" , "unit" ]:
177+ metric_labels .append (MetricLabel (name = key , value = str (val )))
178+
179+ # Create data point
180+ if query_type == MetricQueryType .RANGE and granularity :
181+ # Parse bucket start time for aggregated range queries
182+ try :
183+ bucket_start_raw = row ["bucket_start" ]
184+ if bucket_start_raw is not None :
185+ bucket_start = datetime .fromisoformat (bucket_start_raw )
186+ else :
187+ # Error out if bucket_start is None
188+ raise ValueError ("bucket_start is None - this indicates a query configuration error" )
189+ except KeyError as e :
190+ # Error out if bucket_start column doesn't exist in the result
191+ raise ValueError (
192+ "bucket_start column not found in query result when trying to use granularity. Timestamps in the database might be mis-formatted"
193+ ) from e
194+ timestamp = int (bucket_start .timestamp ())
195+ elif query_type == MetricQueryType .INSTANT :
196+ # Use current time for instant queries
197+ timestamp = int (datetime .now (UTC ).timestamp ())
198+ else :
199+ # Use original timestamp for non-aggregated queries
200+ # Parse timestamp from database
201+ timestamp_raw = row ["timestamp" ]
202+ if timestamp_raw is not None :
203+ timestamp_iso = datetime .fromisoformat (timestamp_raw )
204+ else :
205+ raise ValueError ("timestamp is None - this indicates a data integrity issue" )
206+ timestamp = int (timestamp_iso .timestamp ())
207+
208+ data_points .append (
209+ MetricDataPoint (
210+ timestamp = timestamp ,
211+ value = value ,
212+ unit = unit ,
213+ )
214+ )
215+
216+ # Create metric series
217+ metric_series = [MetricSeries (metric = metric_name , labels = labels , values = data_points )]
218+
219+ return QueryMetricsResponse (data = metric_series )
220+
221+ def _get_time_format_for_granularity (self , granularity : str | None ) -> str :
222+ """Get the SQLite strftime format string for a given granularity.
223+ Args:
224+ granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
225+ Returns:
226+ SQLite strftime format string for the granularity
227+ """
228+ if granularity is None :
229+ raise ValueError ("granularity cannot be None for this method - use separate logic for no aggregation" )
230+
231+ if granularity .endswith ("d" ):
232+ return "'%Y-%m-%d 00:00:00'"
233+ elif granularity .endswith ("h" ):
234+ return "'%Y-%m-%d %H:00:00'"
235+ elif granularity .endswith ("m" ):
236+ return "'%Y-%m-%d %H:%M:00'"
237+ else :
238+ return "'%Y-%m-%d %H:00:00'" # Default to hour-level
239+
37240 async def query_traces (
38241 self ,
39242 attribute_filters : list [QueryCondition ] | None = None ,
0 commit comments