Skip to content

Commit 1ac82ed

Browse files
kerthcetgoogs1025
andauthored
Polish the architecture of metrics aggregator (#465)
* Update ci workflow to 0.1.18, which go version is 1.24 Signed-off-by: kerthcet <[email protected]> * add miss integration test case for tensorrt-llm backend (#455) * Update ci workflow to 0.1.18, which go version is 1.24 (#457) * Update ci workflow to 0.1.18, which go version is 1.24 Signed-off-by: kerthcet <[email protected]> * Fix lint Signed-off-by: kerthcet <[email protected]> * Fix test Signed-off-by: kerthcet <[email protected]> --------- Signed-off-by: kerthcet <[email protected]> * Move DataStore to memStore Signed-off-by: kerthcet <[email protected]> * Polish the architecture Signed-off-by: kerthcet <[email protected]> --------- Signed-off-by: kerthcet <[email protected]> Co-authored-by: CYJiang <[email protected]>
1 parent 20fc66d commit 1ac82ed

File tree

5 files changed

+72
-101
lines changed

5 files changed

+72
-101
lines changed

components/router/pkg/store/mem.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,66 @@ import (
2222
"sync"
2323
)
2424

25+
type DataStore struct {
26+
mu sync.RWMutex
27+
data map[string]Indicator // Key: name, Value: Indicator
28+
29+
// Keep track of the min/max values for each metric, 0-index is min and 1-index is max.
30+
// They will be used in score plugins.
31+
RunningQueueSize [2]float64
32+
WaitingQueueSize [2]float64
33+
KVCacheUsage [2]float64
34+
}
35+
36+
func (d *DataStore) Get(ctx context.Context, name string) (Indicator, error) {
37+
d.mu.RLock()
38+
defer d.mu.RUnlock()
39+
40+
metrics, exists := d.data[name]
41+
if !exists {
42+
return Indicator{}, fmt.Errorf("metrics for datastore %s not found", name)
43+
}
44+
return metrics, nil
45+
}
46+
47+
// TODO: we should not iterate all the data which may lead to performance issue.
48+
func (d *DataStore) FilterIterate(ctx context.Context, fn func(context.Context, Indicator) bool) (names []string) {
49+
d.mu.RLock()
50+
defer d.mu.RUnlock()
51+
52+
for name, indicator := range d.data {
53+
if fn(ctx, indicator) {
54+
names = append(names, name)
55+
}
56+
}
57+
return
58+
59+
}
60+
61+
// TODO: return multi candidates to avoid hotspot with multi instances.
62+
func (d *DataStore) ScoreIterate(ctx context.Context, fn func(context.Context, Indicator) float32) string {
63+
d.mu.RLock()
64+
defer d.mu.RUnlock()
65+
66+
var highestScore float32
67+
var candidate string
68+
69+
for name, indicator := range d.data {
70+
score := fn(ctx, indicator)
71+
// Iterate the d.data is already in random order, so we can just pick the first one with the highest score.
72+
if score > highestScore {
73+
highestScore = score
74+
candidate = name
75+
}
76+
}
77+
return candidate
78+
}
79+
2580
var _ Store = &MemoryStore{}
2681

2782
type MemoryStore struct {
2883
mu sync.RWMutex
29-
data map[string]*DataStore // Key: modelName, Value: *podWrapperStore
84+
data map[string]*DataStore // Key: modelName
3085
}
3186

3287
func NewMemoryStore() *MemoryStore {

components/router/pkg/store/store.go

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ package store
1818

1919
import (
2020
"context"
21-
"fmt"
22-
"sync"
2321
)
2422

2523
type Store interface {
@@ -31,57 +29,3 @@ type Store interface {
3129
// Should only used for testing.
3230
Len() int32
3331
}
34-
35-
type DataStore struct {
36-
mu sync.RWMutex
37-
data map[string]Indicator // Key: name, Value: Indicator
38-
39-
// Keep track of the min/max values for each metric, 0-index is min and 1-index is max.
40-
// They will be used in score plugins.
41-
RunningQueueSize [2]float64
42-
WaitingQueueSize [2]float64
43-
KVCacheUsage [2]float64
44-
}
45-
46-
func (d *DataStore) Get(ctx context.Context, name string) (Indicator, error) {
47-
d.mu.RLock()
48-
defer d.mu.RUnlock()
49-
50-
metrics, exists := d.data[name]
51-
if !exists {
52-
return Indicator{}, fmt.Errorf("metrics for datastore %s not found", name)
53-
}
54-
return metrics, nil
55-
}
56-
57-
// TODO: we should not iterate all the dataStore which may lead to performance issue.
58-
func (d *DataStore) FilterIterate(ctx context.Context, fn func(context.Context, Indicator) bool) (names []string) {
59-
d.mu.RLock()
60-
defer d.mu.RUnlock()
61-
62-
for name, indicator := range d.data {
63-
if fn(ctx, indicator) {
64-
names = append(names, name)
65-
}
66-
}
67-
return
68-
69-
}
70-
71-
func (d *DataStore) ScoreIterate(ctx context.Context, fn func(context.Context, Indicator) float32) string {
72-
d.mu.RLock()
73-
defer d.mu.RUnlock()
74-
75-
var highestScore float32
76-
var candidate string
77-
78-
for name, indicator := range d.data {
79-
score := fn(ctx, indicator)
80-
// Iterate the d.data is already in random order, so we can just pick the first one with the highest score.
81-
if score > highestScore {
82-
highestScore = score
83-
candidate = name
84-
}
85-
}
86-
return candidate
87-
}

docs/proposals/376-metric-aggregagor/README.md

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Proposal-376: Gateway Metric Aggregator
1+
# Proposal-376: Metric Aggregator
22

33
<!--
44
This is the title of your Proposal. Keep it short, simple, and descriptive. A good
@@ -85,9 +85,8 @@ List the specific goals of the Proposal. What is it trying to achieve? How will
8585
know that this has succeeded?
8686
-->
8787

88-
- A simple implementation with least-latency scheduling algorithm
89-
- Extensible with different consumers in the cluster, like the Lora autoscaler or the ai gateway
90-
- Metrics visualization support, like Grafana
88+
- A simple implementation with latency aware dispatching algorithm
89+
- Extensible with different consumers in the cluster, like the HPA autoscaler or the ai gateway
9190

9291
### Non-Goals
9392

@@ -99,6 +98,7 @@ and make progress.
9998
- Different scheduling algorithm implementations in ai gateway, like prefix-cache aware
10099
- LoRA aware scheduling implementation, will be left to another KEP
101100
- Performance consideration in big clusters should be left to the Beta level
101+
- How HPA consumers the metrics should be left to another KEP.
102102

103103
## Proposal
104104

@@ -175,41 +175,16 @@ The overall flow looks like:
175175
Let's break down the flow into several steps:
176176

177177
- Step 1: we'll collect the metrics from the inference workloads in metrics aggregator.
178-
- Step 2: the aggregator will parse the metrics and store them in the redis, this is for HA consideration and cache sharing. Once the instance is down, we can still retrieve the metrics from redis. And if we have multiple instances, we can share the metrics with each other via redis. Considering Envoy AI gateway already uses Redis for limit rating, we'll reuse the Redis here.
179-
- Step 3 & 4: Traffic comes, the gateway plugin (we'll call it router later) will retrieve the metrics from Redis and make routing decisions based on different algorithms, like queue size aware scheduling.
178+
- Step 2: the aggregator will parse the metrics and store them in the disk memory. We'll use the disk memory at first for quick starting and fast access. We may upgrade the architecture in the future, see Drawbacks section for more details.
179+
- Step 3 & 4: Traffic comes, the gateway plugin (we'll call it router later) will retrieve the metrics from the storage and make routing decisions based on different algorithms, like latency aware scheduling.
180180
- Step 5: The router will send the request to the selected instance, and the instance will return the result to the router, return to the user finally.
181181

182182
### Additional components introduced:
183183

184-
- Metrics Aggregator (MA): MA is working as the controller plane to sync the metrics, this is also one of the reason why we want to decouple it from the router, which working as a data plane. MA has several components:
185-
- A Pod controller to manage the Pod lifecycle, for example, once a Pod is ready, it will add it to the internal store, and each Pod will fork a background goroutine to sync the metrics continuously, 50ms interval by default. Once the Pod is deleted, the goroutine will be stopped and removed from the store.
186-
- A internal store to parse the metric results, and store it in the backend storage, like Redis.
187-
- Redis: a Redis instance is necessary for the metrics storage and sharing, we can use the existing Redis instance in the cluster, or deploy a new one if necessary. We should have storage interface to support different backends in the future.
188-
- Router: a new router or [DynamicLoadBalancingBackend](https://github.com/envoyproxy/ai-gateway/blob/be2b479b04bc7a219b0c8239143bfbabebdcd615/filterapi/filterconfig.go#L199-L208) specifically in Envoy AI gateway to pick the best-fit Pod endpoints. However, we may block by the upstream issue [here](https://github.com/envoyproxy/ai-gateway/issues/604), we'll work with the Envoy AI Gateway team to resolve it ASAP. Maybe the final design will impact our implementation a bit but not much I think.
189-
190-
### Data Structure
191-
192-
The data structure could be varied based on the metrics we want to collect, let's take the queue size as an example:
193-
194-
Because redis is a kv store, we'll use the ZSET to store the results, `LeastLatency::ModelName` as the key, Pod name as the member and the (runningQueueSize * 0.3 + waitingQueueSize * 0.7) as the score, the factor of waitingQueueSize is higher because metric is a delayed indicator. RunningQueueSize and WaitingQueueSize are two metrics most of the inference engines support.
195-
196-
We'll also have another key to record the update timestamp. For example, a Pod named "default/fake-pod" with the score = 0.5, the set commands look like:
197-
198-
```bash
199-
# set the update timestamp
200-
SET default/fake-pod "2025-05-12T06:16:27Z"
201-
202-
# set the score
203-
ZADD LeastLatency::ModelName 0.5 default/fake-pod
204-
```
205-
206-
When collecting, we'll update the timestamp and score together. Setting the top 5 is enough for us to help reduce the storage pressure since it's a memory-based database. We don't use the expiration key here is just because most of the time, the metrics should be updated at a regular interval.
207-
208-
When retrieving, we'll first query the ZSET to get the top 5 records, and iterate them one by one to verify that `currentTimestamp - recordTimestamp < 5s`, if not, skipping to the next one. This is to avoid outdated metrics. Once picked the exact endpoint, we'll reset the score with waitingQueueSize + 1 to avoid hotspot issues, especially when metrics update is blocked by some reasons.
209-
210-
If all metrics are outdated, we'll fallback to the default service.
211-
212-
Note: the algorithm is not the final one, we'll have more discussions with the community to find the best one.
184+
- Metrics Aggregator (MA): MA is working as the controller plane to sync the metrics, however, it works as a data plane as well at this moment, we will revisit this once we graduate to Beta/GA. MA has several components:
185+
- A Pod controller to manage the Pod lifecycle, for example, once a Pod is ready, it will add it to the internal store, and each Pod will fork a background goroutine to sync the metrics continuously, 100ms interval by default. Once the Pod is deleted, the goroutine will be stopped and removed from the store.
186+
- A internal store to parse the metric results, and store it in the backend storage, right now we only support disk memory, but the interface is defined and we can extend it later.
187+
- Router: A LLM request dispatcher to route the requests to specific Pods based on the metrics reading from the MA. However, we may block by the upstream issue [here](https://github.com/envoyproxy/ai-gateway/issues/604), we'll work with the Envoy AI Gateway team to resolve it ASAP. Maybe the final design will impact our implementation a bit but not much I think.
213188

214189
### Test Plan
215190

@@ -308,10 +283,8 @@ milestones with these graduation criteria:
308283

309284
Beta:
310285

311-
- Other storages rather than KV store who supports only key-value pairs which might be not enough for more complex scenarios, like the prefix-cache aware scenario.
312-
- HA support, once the metrics aggregator is down, the system should still work.
313-
- No performance issues in big clusters, we may use daemonsets to report metrics.
314-
- Once the picked Pod is down after the routing decision, router will fallback to the default service. Fallback mode is already supported in Envoy AI gateway.
286+
- No performance issues in big clusters, especially we have multiple router instances there.
287+
- The data plane and the control plane should be decoupled.
315288

316289
## Implementation History
317290

@@ -327,12 +300,11 @@ Major milestones might include:
327300
-->
328301

329302
- 2025-05-08: Proposal initialized and submitted for review
303+
- 2025-05-19: Proposal polished with the new architecture design and flow diagram.
330304

331305
## Drawbacks
332306

333-
<!--
334-
Why should this Proposal _not_ be implemented?
335-
-->
307+
The biggest drawback of this proposal is that the router is now coupled with the metrics aggregator because of the shared memory store. In the future, we should optimize this either by using a database or hammer the metric report logics to the inference engines directly, which works as a event driven architecture, then the router instances will watch the events to build a local memory, together with the metrics aggregator.
336308

337309
## Alternatives
338310

@@ -342,4 +314,4 @@ not need to be as detailed as the proposal, but should include enough
342314
information to express the idea and why it was not acceptable.
343315
-->
344316

345-
- When collecting metrics from the inference workloads, `PUSH` mode will put less pressure on the gateway side, or the gateway will have iterate all the Pods which obviously will lead to performance issues. We didn't pick the approach because it will either add additional load to the inference workload and introduces more complexity to the system. The current approach will fork as much goroutines as the number of inference workloads to sync the metrics in parallel, this is feasible because goroutine is lightweight. Once the metrics aggregator becomes the bottleneck, we can consider to use `PUSH` mode at node level.
317+
- When collecting metrics from the inference workloads, `PUSH` mode will put less pressure on the gateway side, or the gateway will have iterate all the Pods which obviously will lead to performance issues. We didn't pick the approach because it will either add additional load to the inference workload and introduces more complexity to the system. The current approach will fork as much goroutines as the number of inference workloads to sync the metrics in parallel, this is feasible because goroutine is lightweight. Once the metrics aggregator becomes the bottleneck, we can consider to use `PUSH` mode at node level.
762 Bytes
Loading

docs/proposals/376-metric-aggregagor/proposal.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
title: Gateway Metric Aggregator
1+
title: Metric Aggregator
22
proposal-number: 376
33
authors:
44
- kerthcet

0 commit comments

Comments
 (0)