|
| 1 | +# Copyright 2025 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +"""Metrics util classes for collecting and managing metrics.""" |
| 16 | + |
| 17 | +import datetime |
| 18 | + |
| 19 | +import numpy as np |
| 20 | +from typing import Tuple, List, Dict |
| 21 | + |
| 22 | + |
| 23 | +def _floor_datetime_to_sec(timestamp: datetime.datetime) -> datetime.datetime: |
| 24 | + """ "Floor the timestamp to the nearest most recent second""" |
| 25 | + return timestamp.replace(microsecond=0) |
| 26 | + |
| 27 | + |
| 28 | +def _now_floored_to_second() -> datetime.datetime: |
| 29 | + """Return the current timestamp floored to the nearest most recent second a""" |
| 30 | + now = datetime.datetime.now() |
| 31 | + return _floor_datetime_to_sec(now) |
| 32 | + |
| 33 | + |
| 34 | +class EventMetric: |
| 35 | + """An event metric for distribution stats reporting. Not thread-safe.""" |
| 36 | + |
| 37 | + def __init__(self, name: str, description: str, unit: str = ""): |
| 38 | + self._name = name |
| 39 | + self._description = description |
| 40 | + self._unit = unit |
| 41 | + self._data = [] |
| 42 | + |
| 43 | + def data(self) -> List[float]: |
| 44 | + """Returns all stored data points. |
| 45 | +
|
| 46 | + Returns: |
| 47 | + A list of data points in the order that was stored |
| 48 | + """ |
| 49 | + return self._data |
| 50 | + |
| 51 | + def record(self, value: float): |
| 52 | + """Record a data point |
| 53 | +
|
| 54 | + Args: |
| 55 | + value: The data point to be stored. |
| 56 | + """ |
| 57 | + self._data.append(value) |
| 58 | + |
| 59 | + def percentile(self, percentile: int) -> float: |
| 60 | + """Computes and returns the specified percentile of the collected data. |
| 61 | +
|
| 62 | + Args: |
| 63 | + percentile: The percentile to compute. |
| 64 | +
|
| 65 | + Returns: |
| 66 | + The computed percentile. |
| 67 | + """ |
| 68 | + if not 0 <= percentile <= 100: |
| 69 | + raise ValueError(f"Percentile {percentile} is not in [0, 100]") |
| 70 | + if not self._data: |
| 71 | + raise ValueError( |
| 72 | + f"No data points in metric {self._name} to compute percentile" |
| 73 | + ) |
| 74 | + return np.percentile(self._data, percentile) |
| 75 | + |
| 76 | + def mean(self) -> float: |
| 77 | + """Calculates and returns the mean value of the collected data. |
| 78 | +
|
| 79 | + Returns: |
| 80 | + The mean value of the collected data |
| 81 | + """ |
| 82 | + if not self._data: |
| 83 | + raise ValueError(f"No data points in metric {self._name} to compute mean") |
| 84 | + return np.mean(self._data) |
| 85 | + |
| 86 | + def distribution_summary_str(self) -> str: |
| 87 | + """Generates a string representation of the distribution summary |
| 88 | +
|
| 89 | + Returns: |
| 90 | + The string representation of the distribution summary including |
| 91 | + mean, p50, p90 and p99. |
| 92 | + """ |
| 93 | + s = "" |
| 94 | + s += f"Mean {self._name}: {self.mean():.2f} {self._unit}\n" |
| 95 | + s += f"Median {self._name}: {self.percentile(50):.2f} {self._unit}\n" |
| 96 | + s += f"P99 {self._name}: {self.percentile(99):.2f} {self._unit}" |
| 97 | + return s |
| 98 | + |
| 99 | + def distribution_summary_dict(self) -> dict[str, float]: |
| 100 | + """Generates a dictionary representation of the distribution summary |
| 101 | +
|
| 102 | + Returns: |
| 103 | + A dictionary containing of the distribution summary including mean, |
| 104 | + p50, p90 and p99. |
| 105 | + """ |
| 106 | + return { |
| 107 | + f"mean_{self._name}_{self._unit}": self.mean(), |
| 108 | + f"median_{self._name}_{self._unit}": self.percentile(50), |
| 109 | + f"p99_{self._name}_{self._unit}": self.percentile(99), |
| 110 | + } |
| 111 | + |
| 112 | + |
| 113 | +class CounterMetric: |
| 114 | + """A count metric for computing rates over time. Not thread-safe.""" |
| 115 | + |
| 116 | + def __init__(self, name: str, description: str): |
| 117 | + self._name = name |
| 118 | + self._description = description |
| 119 | + self._data: dict[datetime.datetime, int] = {} |
| 120 | + |
| 121 | + def data(self) -> Dict[datetime.datetime, int]: |
| 122 | + """Returns all stored data points. |
| 123 | +
|
| 124 | + Returns: |
| 125 | + A dictionary of data points where the key is the timestamp and the value |
| 126 | + is the aggregated counts within the second of the timestamp. |
| 127 | + """ |
| 128 | + return self._data |
| 129 | + |
| 130 | + def total_count(self) -> int: |
| 131 | + """Returns aggregated counts |
| 132 | +
|
| 133 | + Returns: |
| 134 | + The aggregated counts. |
| 135 | + """ |
| 136 | + return sum(self._data.values()) |
| 137 | + |
| 138 | + def total_duration_sec(self) -> int: |
| 139 | + """Returns the duration between the first and last count increment |
| 140 | +
|
| 141 | + Returns: |
| 142 | + The duration (in seconds) between the first and last increment |
| 143 | + (inclusive of both ends). |
| 144 | + """ |
| 145 | + start_time = min(self._data.keys()) |
| 146 | + end_time = max(self._data.keys()) |
| 147 | + return int((end_time - start_time).total_seconds() + 1) |
| 148 | + |
| 149 | + def increment( |
| 150 | + self, count: int = 1, timestamp: datetime.datetime | None = None |
| 151 | + ): |
| 152 | + """Increment the counter by count |
| 153 | +
|
| 154 | + Args: |
| 155 | + count: The amount to increment |
| 156 | + timestamp: The timestamp for the increment. Default to now if none is |
| 157 | + provided. |
| 158 | + """ |
| 159 | + if timestamp is None: |
| 160 | + cur_time = _now_floored_to_second() |
| 161 | + else: |
| 162 | + cur_time = _floor_datetime_to_sec(timestamp) |
| 163 | + # Add timestamp with default value 0 if doesn't exist |
| 164 | + cur_count = self._data.setdefault(cur_time, 0) |
| 165 | + self._data[cur_time] = cur_count + count |
| 166 | + return |
| 167 | + |
| 168 | + def rate(self) -> float: |
| 169 | + """Calculates the rate of change between the first and last increments. |
| 170 | +
|
| 171 | + Returns: |
| 172 | + The rate of change between the first and last increments. |
| 173 | + """ |
| 174 | + if len(self._data.keys()) < 2: |
| 175 | + raise ValueError( |
| 176 | + "At least 2 data points are required to compute the rate" |
| 177 | + ) |
| 178 | + start_time = min(self._data.keys()) |
| 179 | + end_time = max(self._data.keys()) |
| 180 | + delta_time_sec = (end_time - start_time).total_seconds() |
| 181 | + sorted_counts = [count for timestamp, count in sorted(self._data.items())] |
| 182 | + delta_count = sum(sorted_counts[1:]) |
| 183 | + return delta_count / delta_time_sec |
| 184 | + |
| 185 | + def rate_over_window( |
| 186 | + self, window_size_sec: int |
| 187 | + ) -> List[Tuple[datetime.datetime, float]]: |
| 188 | + """Calculate the rates over time." |
| 189 | +
|
| 190 | + Args: |
| 191 | + window_size_sec: The size of the window in seconds for computing each |
| 192 | + individual rate |
| 193 | +
|
| 194 | + Returns: |
| 195 | + A list of rates over time, where each element represents the rate of |
| 196 | + change for the specified window size. |
| 197 | + """ |
| 198 | + if len(self._data.keys()) < 2: |
| 199 | + raise ValueError( |
| 200 | + f"At least 2 different timestamp values are required to calculate " |
| 201 | + f"the rate, but have only {len(self._data.keys())}" |
| 202 | + ) |
| 203 | + rates: List[Tuple[datetime.datetime, float]] = [] |
| 204 | + sorted_data = sorted(self._data.items()) |
| 205 | + |
| 206 | + start_time, _ = sorted_data[0] |
| 207 | + end_time, _ = sorted_data[-1] |
| 208 | + cur_start_time = start_time |
| 209 | + cur_end_time = cur_start_time + datetime.timedelta(seconds=window_size_sec) |
| 210 | + cur_total_count = 0 |
| 211 | + for data_point in sorted_data: |
| 212 | + timestamp, count = data_point |
| 213 | + if timestamp >= cur_end_time: |
| 214 | + while timestamp >= cur_end_time: |
| 215 | + rates.append((cur_start_time, cur_total_count / window_size_sec)) |
| 216 | + cur_start_time = cur_end_time |
| 217 | + cur_end_time = cur_start_time + datetime.timedelta( |
| 218 | + seconds=window_size_sec |
| 219 | + ) |
| 220 | + cur_total_count = 0 |
| 221 | + cur_total_count += count |
| 222 | + if cur_start_time <= end_time: |
| 223 | + delta_time_sec = (end_time - cur_start_time).total_seconds() + 1 |
| 224 | + rates.append((cur_start_time, cur_total_count / delta_time_sec)) |
| 225 | + return rates |
| 226 | + |
| 227 | + def rate_over_window_to_csv(self, window_size_sec: int) -> str: |
| 228 | + """Compute and return the rates over time and return them in csv string |
| 229 | +
|
| 230 | + Args: |
| 231 | + window_size_sec: The size of the window in seconds for computing each |
| 232 | + individual rate |
| 233 | +
|
| 234 | + Returns: |
| 235 | + A CSV string representation of the rates over time, with two rows: |
| 236 | + the first row contains timestamps, and the second row contains rate |
| 237 | + values. |
| 238 | + """ |
| 239 | + rates = self.rate_over_window(window_size_sec) |
| 240 | + # Generate CSV string with two rows |
| 241 | + timestamps = "TimeStamp," + ",".join([str(e[0]) for e in rates]) |
| 242 | + values = "Value," + ",".join([f"{e[1]:.2f}" for e in rates]) |
| 243 | + csv_output = timestamps + "\n" + values |
| 244 | + return csv_output |
0 commit comments