55# the root directory of this source tree.
66
77import json
8- from datetime import datetime
8+ from datetime import UTC , datetime
99from typing import Protocol
1010
1111import aiosqlite
1212
13- from llama_stack .apis .telemetry import QueryCondition , Span , SpanWithStatus , Trace
13+ from llama_stack .apis .telemetry import (
14+ MetricDataPoint ,
15+ MetricLabel ,
16+ MetricLabelMatcher ,
17+ MetricQueryType ,
18+ MetricSeries ,
19+ QueryCondition ,
20+ QueryMetricsResponse ,
21+ Span ,
22+ SpanWithStatus ,
23+ Trace ,
24+ )
1425
1526
1627class TraceStore (Protocol ):
@@ -29,11 +40,192 @@ async def get_span_tree(
2940 max_depth : int | None = None ,
3041 ) -> dict [str , SpanWithStatus ]: ...
3142
43+ async def query_metrics (
44+ self ,
45+ metric_name : str ,
46+ start_time : datetime ,
47+ end_time : datetime | None = None ,
48+ granularity : str | None = "1d" ,
49+ query_type : MetricQueryType = MetricQueryType .RANGE ,
50+ label_matchers : list [MetricLabelMatcher ] | None = None ,
51+ ) -> QueryMetricsResponse : ...
52+
3253
3354class SQLiteTraceStore (TraceStore ):
3455 def __init__ (self , conn_string : str ):
3556 self .conn_string = conn_string
3657
58+ async def query_metrics (
59+ self ,
60+ metric_name : str ,
61+ start_time : datetime ,
62+ end_time : datetime | None = None ,
63+ granularity : str | None = None ,
64+ query_type : MetricQueryType = MetricQueryType .RANGE ,
65+ label_matchers : list [MetricLabelMatcher ] | None = None ,
66+ ) -> QueryMetricsResponse :
67+ if end_time is None :
68+ end_time = datetime .now (UTC )
69+
70+ # Build base query
71+ if query_type == MetricQueryType .INSTANT :
72+ query = """
73+ SELECT
74+ se.name,
75+ SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
76+ json_extract(se.attributes, '$.unit') as unit,
77+ se.attributes
78+ FROM span_events se
79+ WHERE se.name = ?
80+ AND se.timestamp BETWEEN ? AND ?
81+ """
82+ else :
83+ if granularity :
84+ time_format = self ._get_time_format_for_granularity (granularity )
85+ query = f"""
86+ SELECT
87+ se.name,
88+ SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
89+ json_extract(se.attributes, '$.unit') as unit,
90+ se.attributes,
91+ strftime('{ time_format } ', se.timestamp) as bucket_start
92+ FROM span_events se
93+ WHERE se.name = ?
94+ AND se.timestamp BETWEEN ? AND ?
95+ """
96+ else :
97+ query = """
98+ SELECT
99+ se.name,
100+ json_extract(se.attributes, '$.value') as value,
101+ json_extract(se.attributes, '$.unit') as unit,
102+ se.attributes,
103+ se.timestamp
104+ FROM span_events se
105+ WHERE se.name = ?
106+ AND se.timestamp BETWEEN ? AND ?
107+ """
108+
109+ params = [f"metric.{ metric_name } " , start_time .isoformat (), end_time .isoformat ()]
110+
111+ # Labels that will be attached to the MetricSeries (preserve matcher labels)
112+ all_labels : list [MetricLabel ] = []
113+ matcher_label_names = set ()
114+ if label_matchers :
115+ for matcher in label_matchers :
116+ json_path = f"$.{ matcher .name } "
117+ if matcher .operator == "=" :
118+ query += f" AND json_extract(se.attributes, '{ json_path } ') = ?"
119+ params .append (matcher .value )
120+ elif matcher .operator == "!=" :
121+ query += f" AND json_extract(se.attributes, '{ json_path } ') != ?"
122+ params .append (matcher .value )
123+ elif matcher .operator == "=~" :
124+ query += f" AND json_extract(se.attributes, '{ json_path } ') LIKE ?"
125+ params .append (f"%{ matcher .value } %" )
126+ elif matcher .operator == "!~" :
127+ query += f" AND json_extract(se.attributes, '{ json_path } ') NOT LIKE ?"
128+ params .append (f"%{ matcher .value } %" )
129+ # Preserve filter context in output
130+ all_labels .append (MetricLabel (name = matcher .name , value = str (matcher .value )))
131+ matcher_label_names .add (matcher .name )
132+
133+ # GROUP BY / ORDER BY logic
134+ if query_type == MetricQueryType .RANGE and granularity :
135+ group_time_format = self ._get_time_format_for_granularity (granularity )
136+ query += f" GROUP BY strftime('{ group_time_format } ', se.timestamp), json_extract(se.attributes, '$.unit')"
137+ query += " ORDER BY bucket_start"
138+ elif query_type == MetricQueryType .INSTANT :
139+ query += " GROUP BY json_extract(se.attributes, '$.unit')"
140+ else :
141+ query += " ORDER BY se.timestamp"
142+
143+ # Execute query
144+ async with aiosqlite .connect (self .conn_string ) as conn :
145+ conn .row_factory = aiosqlite .Row
146+ async with conn .execute (query , params ) as cursor :
147+ rows = await cursor .fetchall ()
148+
149+ if not rows :
150+ return QueryMetricsResponse (data = [])
151+
152+ data_points = []
153+ # We want to add attribute labels, but only those not already present as matcher labels.
154+ attr_label_names = set ()
155+ for row in rows :
156+ # Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
157+ try :
158+ attributes = json .loads (row ["attributes" ] or "{}" )
159+ except (TypeError , json .JSONDecodeError ):
160+ attributes = {}
161+
162+ value = row ["value" ]
163+ unit = row ["unit" ] or ""
164+
165+ # Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
166+ for k , v in attributes .items ():
167+ if k not in ["value" , "unit" ] and k not in matcher_label_names and k not in attr_label_names :
168+ all_labels .append (MetricLabel (name = k , value = str (v )))
169+ attr_label_names .add (k )
170+
171+ # Determine timestamp
172+ if query_type == MetricQueryType .RANGE and granularity :
173+ try :
174+ bucket_start_raw = row ["bucket_start" ]
175+ except KeyError as e :
176+ raise ValueError (
177+ "DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
178+ ) from e
179+ # this value could also be there, but be NULL, I think.
180+ if bucket_start_raw is None :
181+ raise ValueError ("bucket_start is None check time format and data" )
182+ bucket_start = datetime .fromisoformat (bucket_start_raw )
183+ timestamp = int (bucket_start .timestamp ())
184+ elif query_type == MetricQueryType .INSTANT :
185+ timestamp = int (datetime .now (UTC ).timestamp ())
186+ else :
187+ try :
188+ timestamp_raw = row ["timestamp" ]
189+ except KeyError as e :
190+ raise ValueError (
191+ "DB did not have a timestamp in row, this indicates improper formatting"
192+ ) from e
193+ # this value could also be there, but be NULL, I think.
194+ if timestamp_raw is None :
195+ raise ValueError ("timestamp is None check time format and data" )
196+ timestamp_iso = datetime .fromisoformat (timestamp_raw )
197+ timestamp = int (timestamp_iso .timestamp ())
198+
199+ data_points .append (
200+ MetricDataPoint (
201+ timestamp = timestamp ,
202+ value = value ,
203+ unit = unit ,
204+ )
205+ )
206+
207+ metric_series = [MetricSeries (metric = metric_name , labels = all_labels , values = data_points )]
208+ return QueryMetricsResponse (data = metric_series )
209+
210+ def _get_time_format_for_granularity (self , granularity : str | None ) -> str :
211+ """Get the SQLite strftime format string for a given granularity.
212+ Args:
213+ granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
214+ Returns:
215+ SQLite strftime format string for the granularity
216+ """
217+ if granularity is None :
218+ raise ValueError ("granularity cannot be None for this method - use separate logic for no aggregation" )
219+
220+ if granularity .endswith ("d" ):
221+ return "%Y-%m-%d 00:00:00"
222+ elif granularity .endswith ("h" ):
223+ return "%Y-%m-%d %H:00:00"
224+ elif granularity .endswith ("m" ):
225+ return "%Y-%m-%d %H:%M:00"
226+ else :
227+ return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
228+
37229 async def query_traces (
38230 self ,
39231 attribute_filters : list [QueryCondition ] | None = None ,
0 commit comments