Skip to content

Commit c492248

Browse files
authored
Publish price in a metric (#3759)
* Publish price in a metric * Implement metric
1 parent 8baf330 commit c492248

File tree

3 files changed

+26
-0
lines changed

3 files changed

+26
-0
lines changed

discovery/discovery.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/hex"
77
"errors"
8+
"math/big"
89
"math/rand"
910
"net/url"
1011
"sort"
@@ -364,6 +365,11 @@ func reportLiveAICapacity(ch chan common.OrchestratorDescriptor, caps common.Cap
364365

365366
idleContainersByModelAndOrchestrator := make(map[string]map[string]int)
366367
for _, od := range allOrchInfo {
368+
pricePerUnit := od.RemoteInfo.PriceInfo.PricePerUnit
369+
pixelsPerUnit := od.RemoteInfo.PriceInfo.PixelsPerUnit
370+
pricePerPixel := big.NewRat(pricePerUnit, pixelsPerUnit)
371+
monitor.LiveAIPricePerPixel(od.LocalInfo.URL.String(), pricePerPixel)
372+
367373
var models map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint
368374
if od.RemoteInfo != nil {
369375
models = getModelCaps(od.RemoteInfo.Capabilities)

monitor/census.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ type (
208208
mAIResultSaveFailed *stats.Int64Measure
209209
mAIContainersInUse *stats.Int64Measure
210210
mAIContainersIdle *stats.Int64Measure
211+
mLiveAIPricePerPixel *stats.Float64Measure
211212
aiContainersIdleByPipelineByOrchestrator map[string]map[string]int
212213
mAIGPUsIdle *stats.Int64Measure
213214
mAICurrentLivePipelines *stats.Int64Measure
@@ -391,6 +392,7 @@ func InitCensus(nodeType NodeType, version string) {
391392
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
392393
census.mAIContainersInUse = stats.Int64("ai_container_in_use", "Number of containers currently used for AI processing", "tot")
393394
census.mAIContainersIdle = stats.Int64("ai_container_idle", "Number of containers currently available for AI processing", "tot")
395+
census.mLiveAIPricePerPixel = stats.Float64("live_ai_price_per_pixel", "Live AI price per pixel", "wei/pixel")
394396
census.aiContainersIdleByPipelineByOrchestrator = make(map[string]map[string]int)
395397
census.mAIGPUsIdle = stats.Int64("ai_gpus_idle", "Number of idle GPUs (with no configured container)", "tot")
396398
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
@@ -1029,6 +1031,13 @@ func InitCensus(nodeType NodeType, version string) {
10291031
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName, census.kOrchestratorURI}, baseTags...),
10301032
Aggregation: view.LastValue(),
10311033
},
1034+
{
1035+
Name: "live_ai_price_per_pixel",
1036+
Measure: census.mLiveAIPricePerPixel,
1037+
Description: "Live AI price per pixel",
1038+
TagKeys: append([]tag.Key{census.kOrchestratorURI}, baseTags...),
1039+
Aggregation: view.LastValue(),
1040+
},
10321041
{
10331042
Name: "ai_gpus_idle",
10341043
Measure: census.mAIGPUsIdle,
@@ -2067,6 +2076,15 @@ func AIContainersIdle(currentContainersIdle int, pipeline, modelID, uri string)
20672076
}
20682077
}
20692078

2079+
func LiveAIPricePerPixel(orchestratorURI string, pricePerPixel *big.Rat) {
2080+
floatPrice, _ := pricePerPixel.Float64()
2081+
if err := stats.RecordWithTags(census.ctx,
2082+
[]tag.Mutator{tag.Insert(census.kOrchestratorURI, orchestratorURI)},
2083+
census.mLiveAIPricePerPixel.M(floatPrice)); err != nil {
2084+
glog.Errorf("Error recording metrics err=%q", err)
2085+
}
2086+
}
2087+
20702088
func AIGPUsIdle(currentGPUsIdle int, pipeline, modelID string) {
20712089
if err := stats.RecordWithTags(census.ctx,
20722090
[]tag.Mutator{tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelID)},

server/selection_algorithm.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ func filterByMaxPrice(ctx context.Context, addrs []ethcommon.Address, maxPrice *
7979
price := prices[addr]
8080
if price != nil && price.Cmp(maxPrice) <= 0 {
8181
res = append(res, addr)
82+
} else {
83+
clog.Warningf(ctx, "Orchestrator %s is above max price %v, price=%v", addr, maxPrice, price)
8284
}
8385
}
8486
return res

0 commit comments

Comments
 (0)